In [1]:
pip install ray==2.9.3

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install ray[client]

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install "ray[train]" 

Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install -U ipywidgets

Note: you may need to restart the kernel to use updated packages.


In [5]:
import ray
runtime_env = {"pip": ["torch","torchvision","tqdm","IPython","pandas==1.2.4","filelock"]}
# ray://${RAYCLUSTER_HEAD_SVC}.${NAMESPACE}.svc.cluster.local:${RAY_CLIENT_PORT}
ray.init(address="ray://raycluster1-kuberay-head-svc.default.svc.cluster.local:10001", runtime_env=runtime_env)
print(ray.cluster_resources())

{'node:10.224.189.155': 1.0, 'node:10.224.238.124': 1.0, 'node:__internal_head__': 1.0, 'object_store_memory': 4581757745.0, 'node:10.224.171.216': 1.0, 'GPU': 3.0, 'node:10.224.189.162': 1.0, 'accelerator_type:G': 3.0, 'CPU': 8.0, 'memory': 16000000000.0}


In [7]:
import os
import datetime
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from tqdm import tqdm
from filelock import FileLock
import ray.train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer

import numpy as np
import csv
import time

# Generate Poisson-distributed inter-arrival times
lambda_rate_per_hour = 2.4  # Average number of trainings per hour
lambda_rate_per_second = lambda_rate_per_hour / 3600
num_trainings = 1
inter_arrival_times_seconds = np.random.exponential(1 / lambda_rate_per_second, num_trainings)

# Convert inter-arrival times to timedelta objects
inter_arrival_timedeltas = [datetime.timedelta(seconds=s) for s in inter_arrival_times_seconds]

# Calculate the scheduled start times by cumulatively summing the inter-arrival times, starting from the current time
current_time = datetime.datetime.now()
scheduled_start_times = [current_time + inter_arrival_timedeltas[0]]
for delta in inter_arrival_timedeltas[1:]:
    scheduled_start_times.append(scheduled_start_times[-1] + delta)

def get_dataloaders(batch_size):
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Lambda(lambda x: x.repeat(3, 1, 1)),  # Convert to 3-channel
        transforms.Resize((224, 224)),  # Resize to match MobileNetV3 input size
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])  # ImageNet norms
    ])

    with FileLock(os.path.expanduser("~/data.lock")):
        training_data = datasets.FashionMNIST(
            root="~/data",
            train=True,
            download=True,
            transform=transform
        )

        test_data = datasets.FashionMNIST(
            root="~/data",
            train=False,
            download=True,
            transform=transform
        )

    train_dataloader = DataLoader(training_data, batch_size=batch_size)
    test_dataloader = DataLoader(test_data, batch_size=batch_size)

    return train_dataloader, test_dataloader

class MobileNetV3Model(nn.Module):
    def __init__(self):
        super(MobileNetV3Model, self).__init__()
        # Load a pretrained MobileNetV3 model
        self.model = models.mobilenet_v3_large(pretrained=True)
        # Adjust the classifier to match the number of classes (10 for FashionMNIST)
        self.model.classifier[3] = nn.Linear(self.model.classifier[3].in_features, 10)

    def forward(self, x):
        return self.model(x)

def train_func_per_worker(config: dict):
    lr = config["lr"]
    epochs = config["epochs"]
    batch_size = config["batch_size_per_worker"]

    train_dataloader, test_dataloader = get_dataloaders(batch_size)

    train_dataloader = ray.train.torch.prepare_data_loader(train_dataloader)
    test_dataloader = ray.train.torch.prepare_data_loader(test_dataloader)

    model = MobileNetV3Model()
    model = ray.train.torch.prepare_model(model)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)

    for epoch in range(epochs):
        model.train()
        for X, y in tqdm(train_dataloader, desc=f"Train Epoch {epoch}"):
            pred = model(X)
            loss = loss_fn(pred, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        model.eval()
        test_loss, num_correct, num_total = 0, 0, 0
        with torch.no_grad():
            for X, y in tqdm(test_dataloader, desc=f"Test Epoch {epoch}"):
                pred = model(X)
                loss = loss_fn(pred, y)

                test_loss += loss.item()
                num_total += y.shape[0]
                num_correct += (pred.argmax(1) == y).sum().item()

        test_loss /= len(test_dataloader)
        accuracy = num_correct / num_total

        ray.train.report(metrics={"loss": test_loss, "accuracy": accuracy})

def train_fashion_mnist(num_workers=2, use_gpu=False):
    global_batch_size = 32

    train_config = {
        "lr": 1e-3,
        "epochs": 1,
        "batch_size_per_worker": global_batch_size // num_workers,
    }

    scaling_config = ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)

    trainer = TorchTrainer(
        train_loop_per_worker=train_func_per_worker,
        train_loop_config=train_config,
        scaling_config=scaling_config,
    )

    result = trainer.fit()
    print(f"Training result: {result}")
    
    loss = result.metrics['loss']
    accuracy = result.metrics['accuracy']
    
    return loss,accuracy

# Function to run the training and log the times
def run_and_log_training():
    with open('training_log_small.csv', 'w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["Scheduled Start Time", "Actual Start Time", "End Time", "Duration", "Loss", "Accuracy"])
            
        for i, scheduled_start in enumerate(scheduled_start_times):
            current_time = datetime.datetime.now()
            if scheduled_start > current_time:
                # Wait until the scheduled start time
                time_to_wait = (scheduled_start - current_time).total_seconds()
                
                # If the waiting time is more than 1 hour (3600 seconds), reduce it to 1 hour
                if time_to_wait > 3600:
                    print("Original waiting time exceeds 1 hour. Reducing to 1 hour.")
                    time_to_wait = 3600  # Reduce waiting time to 1 hour
            
                print(f"Waiting {time_to_wait:.2f} seconds until the next scheduled start time.")
                # time.sleep(time_to_wait)

            actual_start_time = datetime.datetime.now()

            # Run the training session
            loss,accuracy = train_fashion_mnist(num_workers=3, use_gpu=True)

            end_time = datetime.datetime.now()
            duration = end_time - actual_start_time

            print(f"Training Session {i+1}")
            print(f"Scheduled Start Time: {scheduled_start.strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"Actual Start Time: {actual_start_time.strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"End Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"Duration: {duration}")
            print("-" * 50)

            # Write the times to the CSV file
            writer.writerow([scheduled_start.strftime('%Y-%m-%d %H:%M:%S'), actual_start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S'), duration, loss , accuracy])

if __name__ == "__main__":
    run_and_log_training()


[36m(TunerInternal pid=143649)[0m [output] This will use the new output engine with verbosity 1. To disable the new output and use the legacy output engine, set the environment variable RAY_AIR_NEW_OUTPUT=0. For more information, please see https://github.com/ray-project/ray/issues/36949
[36m(TunerInternal pid=143649)[0m AIR_VERBOSITY is set, ignoring passed-in ProgressReporter for now.


[36m(TunerInternal pid=143649)[0m 
[36m(TunerInternal pid=143649)[0m View detailed results here: /home/ray/ray_results/TorchTrainer_2024-04-03_07-18-30
[36m(TunerInternal pid=143649)[0m To visualize your results with TensorBoard, run: `tensorboard --logdir /home/ray/ray_results/TorchTrainer_2024-04-03_07-18-30`
[36m(TunerInternal pid=143649)[0m 
[36m(TunerInternal pid=143649)[0m Training started with configuration:
[36m(TunerInternal pid=143649)[0m ╭─────────────────────────────────────────────────╮
[36m(TunerInternal pid=143649)[0m │ Training config                                 │
[36m(TunerInternal pid=143649)[0m ├─────────────────────────────────────────────────┤
[36m(TunerInternal pid=143649)[0m │ train_loop_config/batch_size_per_worker      10 │
[36m(TunerInternal pid=143649)[0m │ train_loop_config/epochs                      1 │
[36m(TunerInternal pid=143649)[0m │ train_loop_config/lr                      0.001 │
[36m(TunerInternal pid=143649)[0m ╰──────

[36m(RayTrainWorker pid=1368, ip=10.224.171.216)[0m Setting up process group for: env:// [rank=0, world_size=3]
[36m(TorchTrainer pid=1315, ip=10.224.171.216)[0m Started distributed worker processes: 
[36m(TorchTrainer pid=1315, ip=10.224.171.216)[0m - (ip=10.224.171.216, pid=1368) world_rank=0, local_rank=0, node_rank=0
[36m(TorchTrainer pid=1315, ip=10.224.171.216)[0m - (ip=10.224.189.162, pid=1378) world_rank=1, local_rank=0, node_rank=1
[36m(TorchTrainer pid=1315, ip=10.224.171.216)[0m - (ip=10.224.238.124, pid=1035) world_rank=2, local_rank=0, node_rank=2
[36m(RayTrainWorker pid=1368, ip=10.224.171.216)[0m Moving model to device: cuda:0
[36m(RayTrainWorker pid=1368, ip=10.224.171.216)[0m Wrapping provided model in DistributedDataParallel.
[36m(RayTrainWorker pid=1035, ip=10.224.238.124)[0m Moving model to device: cuda:0
[36m(RayTrainWorker pid=1378, ip=10.224.189.162)[0m Moving model to device: cuda:0
[36m(RayTrainWorker pid=1378, ip=10.224.189.162)[0m Wrapping

[36m(TunerInternal pid=143649)[0m 
[36m(TunerInternal pid=143649)[0m Training finished iteration 1 at 2024-04-03 07:26:05. Total running time: 7min 34s
[36m(TunerInternal pid=143649)[0m ╭───────────────────────────────╮
[36m(TunerInternal pid=143649)[0m │ Training result               │
[36m(TunerInternal pid=143649)[0m ├───────────────────────────────┤
[36m(TunerInternal pid=143649)[0m │ checkpoint_dir_name           │
[36m(TunerInternal pid=143649)[0m │ time_this_iter_s      450.877 │
[36m(TunerInternal pid=143649)[0m │ time_total_s          450.877 │
[36m(TunerInternal pid=143649)[0m │ training_iteration          1 │
[36m(TunerInternal pid=143649)[0m │ accuracy              0.74445 │
[36m(TunerInternal pid=143649)[0m │ loss                   0.7175 │
[36m(TunerInternal pid=143649)[0m ╰───────────────────────────────╯
Training result: Result(
  metrics={'loss': 0.7175043074046067, 'accuracy': 0.7444511097780444},
  path='/home/ray/ray_results/TorchTrainer_2024

In [16]:
ray.shutdown()