In [0]:
 %sh
 rm -r /dbfs/deepml_lab
 mkdir /dbfs/deepml_lab
 wget -O /dbfs/deepml_lab/penguins.csv https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv

rm: cannot remove '/dbfs/deepml_lab': No such file or directory
--2024-04-04 14:08:33--  https://raw.githubusercontent.com/MicrosoftLearning/mslearn-databricks/main/data/penguins.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 9533 (9.3K) [text/plain]
Saving to: ‘/dbfs/deepml_lab/penguins.csv’

     0K .........                                             100% 1.96M=0.005s

2024-04-04 14:08:33 (1.96 MB/s) - ‘/dbfs/deepml_lab/penguins.csv’ saved [9533/9533]



In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from sklearn.model_selection import train_test_split
   
# Load the data, removing any incomplete rows
df = spark.read.format("csv").option("header", "true").load("/deepml_lab/penguins.csv").dropna()
   
# Encode the Island with a simple integer index
# Scale FlipperLength and BodyMass so they're on a similar scale to the bill measurements
islands = df.select(collect_set("Island").alias('Islands')).first()['Islands']
island_indexes = [(islands[i], i) for i in range(0, len(islands))]
df_indexes = spark.createDataFrame(island_indexes).toDF('Island', 'IslandIdx')
data = df.join(df_indexes, ['Island'], 'left').select(col("IslandIdx"),
                   col("CulmenLength").astype("float"),
                   col("CulmenDepth").astype("float"),
                   (col("FlipperLength").astype("float")/10).alias("FlipperScaled"),
                    (col("BodyMass").astype("float")/100).alias("MassScaled"),
                   col("Species").astype("int")
                    )
   
# Oversample the dataframe to triple its size
# (Deep learning techniques like LOTS of data)
for i in range(1,3):
    data = data.union(data)
   
# Split the data into training and testing datasets   
features = ['IslandIdx','CulmenLength','CulmenDepth','FlipperScaled','MassScaled']
label = 'Species'
      
# Split data 70%-30% into training set and test set
x_train, x_test, y_train, y_test = train_test_split(data.toPandas()[features].values,
                                                    data.toPandas()[label].values,
                                                    test_size=0.30,
                                                    random_state=0)
   
print ('Training Set: %d rows, Test Set: %d rows \n' % (len(x_train), len(x_test)))

Training Set: 957 rows, Test Set: 411 rows 



In [0]:
import torch
import torch.nn as nn
import torch.utils.data as td
import torch.nn.functional as F
   
# Set random seed for reproducability
torch.manual_seed(0)
   
print("Libraries imported - ready to use PyTorch", torch.__version__)

Libraries imported - ready to use PyTorch 2.0.1+cpu


In [0]:
# Create a dataset and loader for the training data and labels
train_x = torch.Tensor(x_train).float()
train_y = torch.Tensor(y_train).long()
train_ds = td.TensorDataset(train_x,train_y)
train_loader = td.DataLoader(train_ds, batch_size=20,
    shuffle=False, num_workers=1)

# Create a dataset and loader for the test data and labels
test_x = torch.Tensor(x_test).float()
test_y = torch.Tensor(y_test).long()
test_ds = td.TensorDataset(test_x,test_y)
test_loader = td.DataLoader(test_ds, batch_size=20,
                             shuffle=False, num_workers=1)
print('Ready to load data')

Ready to load data


In [0]:
# Number of hidden layer nodes
hl = 10
   
# Define the neural network
class PenguinNet(nn.Module):
    def __init__(self):
        super(PenguinNet, self).__init__()
        self.fc1 = nn.Linear(len(features), hl)
        self.fc2 = nn.Linear(hl, hl)
        self.fc3 = nn.Linear(hl, 3)
   
    def forward(self, x):
        fc1_output = torch.relu(self.fc1(x))
        fc2_output = torch.relu(self.fc2(fc1_output))
        y = F.log_softmax(self.fc3(fc2_output).float(), dim=1)
        return y
   
# Create a model instance from the network
model = PenguinNet()
print(model)

PenguinNet(
  (fc1): Linear(in_features=5, out_features=10, bias=True)
  (fc2): Linear(in_features=10, out_features=10, bias=True)
  (fc3): Linear(in_features=10, out_features=3, bias=True)
)


In [0]:
def train(model, data_loader, optimizer):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    # Set the model to training mode
    model.train()
    train_loss = 0
       
    for batch, tensor in enumerate(data_loader):
        data, target = tensor
        #feedforward
        optimizer.zero_grad()
        out = model(data)
        loss = loss_criteria(out, target)
        train_loss += loss.item()
   
        # backpropagate adjustments to the weights
        loss.backward()
        optimizer.step()
   
    #Return average loss
    avg_loss = train_loss / (batch+1)
    print('Training set: Average loss: {:.6f}'.format(avg_loss))
    return avg_loss
              
               
def test(model, data_loader):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    # Switch the model to evaluation mode (so we don't backpropagate)
    model.eval()
    test_loss = 0
    correct = 0
   
    with torch.no_grad():
        batch_count = 0
        for batch, tensor in enumerate(data_loader):
            batch_count += 1
            data, target = tensor
            # Get the predictions
            out = model(data)
   
            # calculate the loss
            test_loss += loss_criteria(out, target).item()
   
            # Calculate the accuracy
            _, predicted = torch.max(out.data, 1)
            correct += torch.sum(target==predicted).item()
               
    # Calculate the average loss and total accuracy for this epoch
    avg_loss = test_loss/batch_count
    print('Validation set: Average loss: {:.6f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        avg_loss, correct, len(data_loader.dataset),
        100. * correct / len(data_loader.dataset)))
       
    # return average loss for the epoch
    return avg_loss

In [0]:
import horovod.torch as hvd
from sparkdl import HorovodRunner
   
def train_hvd(model):
    from torch.utils.data.distributed import DistributedSampler
       
    hvd.init()
       
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    if device.type == 'cuda':
        # Pin GPU to local rank
        torch.cuda.set_device(hvd.local_rank())
       
    # Configure the sampler so that each worker gets a distinct sample of the input dataset
    train_sampler = DistributedSampler(train_ds, num_replicas=hvd.size(), rank=hvd.rank())
    # Use train_sampler to load a different sample of data on each worker
    train_loader = torch.utils.data.DataLoader(train_ds, batch_size=20, sampler=train_sampler)
       
    # The effective batch size in synchronous distributed training is scaled by the number of workers
    # Increase learning_rate to compensate for the increased batch size
    learning_rate = 0.001 * hvd.size()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
       
    # Wrap the local optimizer with hvd.DistributedOptimizer so that Horovod handles the distributed optimization
    optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
   
    # Broadcast initial parameters so all workers start with the same parameters
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)
   
    optimizer.zero_grad()
   
    # Train over 50 epochs
    epochs = 100
    for epoch in range(1, epochs + 1):
        print('Epoch: {}'.format(epoch))
        # Feed training data into the model to optimize the weights
        train_loss = train(model, train_loader, optimizer)
   
    # Save the model weights
    if hvd.rank() == 0:
        model_file = '/dbfs/penguin_classifier_hvd.pt'
        torch.save(model.state_dict(), model_file)
        print('model saved as', model_file)

  "class": algorithms.Blowfish,


In [0]:
# Reset random seed for PyTorch
torch.manual_seed(0)
   
# Create a new model
new_model = PenguinNet()
   
# We'll use CrossEntropyLoss to optimize a multiclass classifier
loss_criteria = nn.CrossEntropyLoss()
   
# Run the distributed training function on 2 nodes
hr = HorovodRunner(np=2, driver_log_verbosity='all') 
hr.run(train_hvd, model=new_model)
   
# Load the trained weights and test the model
test_model = PenguinNet()
test_model.load_state_dict(torch.load('/dbfs/penguin_classifier_hvd.pt'))
test_loss = test(test_model, test_loader)

The global names read or written to by the pickled function are {'hvd': None, 'torch': None, 'train_ds': None, 'range': None, 'print': None, 'train': None}.
The pickled object size is 35743 bytes.

### How to enable Horovod Timeline? ###
HorovodRunner has the ability to record the timeline of its activity with Horovod  Timeline. To
record a Horovod Timeline, set the `HOROVOD_TIMELINE` environment variable  to the location of the
timeline file to be created. You can then open the timeline file  using the chrome://tracing
facility of the Chrome browser.

Start training.


[1,0]<stderr>:  "class": algorithms.Blowfish,
[1,1]<stderr>:  "class": algorithms.Blowfish,
[1,0]<stderr>:/sbin/ldconfig.real: Can't link /lib/libhadoop.so.1.0.0 to libhadoop.so
[1,0]<stderr>:/sbin/ldconfig.real: Renaming of /etc/ld.so.cache~ to /etc/ld.so.cache failed: No such file or directory
[1,0]<stdout>:Epoch: 1
[1,1]<stdout>:Epoch: 1
[1,0]<stdout>:Training set: Average loss: 0.890916
[1,0]<stdout>:Epoch: 2
[1,1]<stdout>:Training set: Average loss: 0.910780
[1,1]<stdout>:Epoch: 2
[1,0]<stdout>:Training set: Average loss: 0.689195
[1,0]<stdout>:Epoch: 3
[1,1]<stdout>:Training set: Average loss: 0.680058
[1,1]<stdout>:Epoch: 3
[1,0]<stdout>:Training set: Average loss: 0.560151
[1,0]<stdout>:Epoch: 4
[1,1]<stdout>:Training set: Average loss: 0.554905
[1,1]<stdout>:Epoch: 4
[1,0]<stdout>:Training set: Average loss: 0.439088
[1,0]<stdout>:Epoch: 5
[1,1]<stdout>:Training set: Average loss: 0.433271
[1,1]<stdout>:Epoch: 5
[1,0]<stdout>:Training set: Average loss: 0.316075
[1,0]<stdout>: