In interactive notebook, the `spark` object is already created.
Instructors tested with 1 driver, 6 executors of small e4 (24 cores, 192GB memory)

### Launch spark environment

In [1]:
import pandas as pd
from sklearn.model_selection import train_test_split
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification

# GET data
df = pd.read_csv('train_dataset.csv')
df.columns = ['body', 'label']  

# split 
train_texts, val_texts, train_labels, val_labels = train_test_split(df['body'], df['label'], test_size=.2)

#  tokenizer
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
train_encodings = tokenizer(train_texts.tolist(), truncation=True, padding=True)
val_encodings = tokenizer(val_texts.tolist(), truncation=True, padding=True)



In [2]:
import torch
class IMDbDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        # change the label to 1 or 0
        self.labels = [1 if label == 'positive' else 0 for label in labels]

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        # tensor of 1 or 0
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)
train_dataset = IMDbDataset(train_encodings, train_labels)
val_dataset = IMDbDataset(val_encodings, val_labels)


In [3]:
import torch
from transformers import DistilBertForSequenceClassification, DistilBertTokenizer, AdamW, get_linear_schedule_with_warmup
from torch.utils.data import DataLoader
from torch.optim import AdamW
from transformers import AdamW


# get cuda
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
model = DistilBertForSequenceClassification.from_pretrained('distilbert-base-uncased', num_labels=2).to(device)

#use data loader 
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=16, num_workers=0)

# set up optimizer
optimizer = AdamW(model.parameters(), lr=5e-5)
epochs = 20  
total_steps = len(train_loader) * epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

# train
for epoch in range(epochs):
    model.train()
    total_train_loss = 0
    total_train_steps = len(train_loader)
    print(f"Starting Epoch {epoch + 1}/{epochs}")

    for step, batch in enumerate(train_loader):
        batch = {k: v.to(device) for k, v in batch.items()}  

        outputs = model(**batch)
        loss = outputs.loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

        total_train_loss += loss.item()

        if (step + 1) % 5 == 0:  
            progress = (step + 1) / total_train_steps * 100
            print(f"Epoch {epoch + 1}, Step {step + 1}/{total_train_steps} ({progress:.2f}%), Loss: {loss.item():.4f}")

    # validation
    model.eval()
    total_val_loss = 0
    total_val_steps = len(val_loader)

    for step, batch in enumerate(val_loader):
        batch = {k: v.to(device) for k, v in batch.items()}  

        with torch.no_grad():
            outputs = model(**batch)
            loss = outputs.loss

        total_val_loss += loss.item()
#print status 
        if (step + 1) % 10 == 0:  
            progress = (step + 1) / total_val_steps * 100
            print(f"Validation Step {step + 1}/{total_val_steps} ({progress:.2f}%), Validation Loss: {loss.item():.2f}")

    avg_train_loss = total_train_loss / total_train_steps
    avg_val_loss = total_val_loss / total_val_steps

    print(f"End of Epoch {epoch + 1}/{epochs} - Average Train Loss: {avg_train_loss:.2f}, Average Validation Loss: {avg_val_loss:.2f}")


Using device: cuda


Some weights of the model checkpoint at distilbert-base-uncased were not used when initializing DistilBertForSequenceClassification: ['vocab_projector.weight', 'vocab_layer_norm.weight', 'vocab_projector.bias', 'vocab_transform.bias', 'vocab_layer_norm.bias', 'vocab_transform.weight']
- This IS expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DistilBertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DistilBertForSequenceClassification were not initialized from the model checkpoint at distilbert-base-uncased and are newly initialized: ['pre_classifier.weight', 'classifier.bias', 'pre_classi

Starting Epoch 1/10
Epoch 1, Step 5/1750 (0.29%), Loss: 0.7779
Epoch 1, Step 10/1750 (0.57%), Loss: 0.6948
Epoch 1, Step 15/1750 (0.86%), Loss: 0.6695
Epoch 1, Step 20/1750 (1.14%), Loss: 0.6387
Epoch 1, Step 25/1750 (1.43%), Loss: 0.5104
Epoch 1, Step 30/1750 (1.71%), Loss: 0.5361
Epoch 1, Step 35/1750 (2.00%), Loss: 0.4209
Epoch 1, Step 40/1750 (2.29%), Loss: 0.6797
Epoch 1, Step 45/1750 (2.57%), Loss: 0.5841
Epoch 1, Step 50/1750 (2.86%), Loss: 0.3783
Epoch 1, Step 55/1750 (3.14%), Loss: 0.3943
Epoch 1, Step 60/1750 (3.43%), Loss: 0.4011
Epoch 1, Step 65/1750 (3.71%), Loss: 0.3599
Epoch 1, Step 70/1750 (4.00%), Loss: 0.3869
Epoch 1, Step 75/1750 (4.29%), Loss: 0.4519
Epoch 1, Step 80/1750 (4.57%), Loss: 0.6563
Epoch 1, Step 85/1750 (4.86%), Loss: 0.6288
Epoch 1, Step 90/1750 (5.14%), Loss: 0.4963
Epoch 1, Step 95/1750 (5.43%), Loss: 0.1124
Epoch 1, Step 100/1750 (5.71%), Loss: 0.3515
Epoch 1, Step 105/1750 (6.00%), Loss: 0.4908
Epoch 1, Step 110/1750 (6.29%), Loss: 0.2030
Epoch 1, S

In [4]:
# saved fine-tuned model
torch.save(model, 'db10.pt')
 