In [4]:
import pandas as pd
import numpy as np
import random
random.seed(42)

import sys
sys.path.append('../code')
from utils import preprocess_text

In [None]:
df = pd.read_csv("../data/mtsamples.csv")
df['transcription_clean'] = df['transcription'].apply(preprocess_text)

vocab = {}
for _, row in df.iterrows():
    x = row["transcription_clean"]
    for word in x.split():
        vocab[word] = vocab.get(word, 0) + 1
len(vocab)

k = 10  
vocab_filtered = [word for word in vocab if vocab[word] > k]  # same order of magnitude as n
len(vocab_filtered)

In [None]:
top_specialties = df.medical_specialty.value_counts()
top_specialties = top_specialties[top_specialties > 100]
classes = list(top_specialties.keys())
prev_count = len(df)
df = df.loc[df.medical_specialty.isin(classes)]
print(f"Went from {prev_count} samples to {len(df)} samples")

class_dict = {c:i for i, c in enumerate(classes)}

def one_hot_encode(specialty):
    y = np.zeros(len(classes), dtype=int)
    y[class_dict[specialty]] = 1
    return y

encoded_specialties = df.medical_specialty.apply(one_hot_encode).tolist()
y = np.stack(encoded_specialties)
y

In [None]:
from collections import Counter
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# Create vocabulary with special tokens
special_tokens = ["<PAD>", "<UNK>"]
word_to_idx = {word: idx+len(special_tokens) for idx, word in enumerate(vocab_filtered)}
for idx, token in enumerate(special_tokens):
    word_to_idx[token] = idx
vocab_size = len(word_to_idx)

# Convert texts to sequences of indices
def tokenize(text):
    return [word_to_idx.get(word, word_to_idx["<UNK>"]) for word in text.split()]

# Create a custom dataset
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = [tokenize(text) for text in texts]
        self.labels = labels
        
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return torch.tensor(self.texts[idx]), self.labels[idx]

# Collate function for padding sequences
def collate_batch(batch):
    texts, labels = zip(*batch)
    texts_padded = pad_sequence([torch.tensor(x) for x in texts], batch_first=True, padding_value=word_to_idx["<PAD>"])
    return texts_padded, torch.tensor(labels)

# Random split
indices = list(range(len(df)))
random.shuffle(indices)  # Shuffle the indices

# Calculate split point (85% train, 15% test)
split_idx = int(0.85 * len(indices))
train_indices = indices[:split_idx]
test_indices = indices[split_idx:]

y_train = y[train_indices]
y_test = y[test_indices]

# Create datasets and dataloaders
train_dataset = TextDataset(df.iloc[train_indices]['transcription_clean'], y_train.argmax(axis=1))
test_dataset = TextDataset(df.iloc[test_indices]['transcription_clean'], y_test.argmax(axis=1))

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=64, collate_fn=collate_batch)

In [36]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence

class EmbeddingClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers=1, bidirectional=False, dropout=0.5):
        super().__init__()
        
        # Embedding layer
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # RNN layer (using LSTM)
        self.rnn = nn.LSTM(embedding_dim, 
                           hidden_dim, 
                           num_layers=num_layers,
                           bidirectional=bidirectional, 
                           dropout=dropout if num_layers > 1 else 0,
                           batch_first=True)
        
        # Output layer
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        
        # Dropout
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):
        # text shape: [batch_size, seq_len]
        
        # Apply embedding layer
        embedded = self.embedding(text)  # [batch_size, seq_len, embedding_dim]
        
        # Pass through RNN
        outputs, (hidden, cell) = self.rnn(embedded)
        
        # If bidirectional, concatenate the final forward and backward hidden states
        if self.rnn.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]
            
        # Apply dropout
        hidden = self.dropout(hidden)
        
        # Pass through linear layer
        return self.fc(hidden)

In [None]:
# Initialize model
embedding_dim = 32
hidden_dim = 32
model = EmbeddingClassifier(vocab_size, embedding_dim, hidden_dim, len(classes), bidirectional=False)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    
    for texts, labels in train_dataloader:
        # Forward pass
        outputs = model(texts)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_dataloader):.4f}')

In [None]:
test_loss, correct = 0, 0
with torch.no_grad():
    for texts, labels in test_dataloader:
        outputs = model(texts)
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        preds = torch.argmax(outputs, axis=1)
        correct += (preds == labels).sum()
test_loss = test_loss/len(test_dataloader)
accuracy = correct / len(y_test)
print(f"Test loss: {test_loss:.4f}")
print(f"Test accuracy: {accuracy:.2f}")