In [0]:
import sys
sys.path.append('/Workspace/Users/bjedelma@gmail.com/ScaleML/scaleml')
import scaleml

Detect computational resources on current node

In [0]:
detected_resources = scaleml.resources()

Specify distributed strategy

In [0]:
framework = {
    "model": "tensorflow",
    "strategy": "tensorflow",
}
framework.get('model')

In [0]:
# Input as a dictionary
framework = {
    "model": "tensorflow",
    "strategy": "tensorflow",
}

scaleml.dist_strategy = scaleml.strategies(framework, detected_resources, devices='all')

In [0]:

"""
Sets up a distributed training strategy based on the chosen framework, available resources, and specified devices.

Args:
    framework (dict): A dictionary containing the framework details.
                        Expected keys: 'model', 'strategy'.
                        Options for 'model': 'tensorflow', 'pytorch'.
                        Options for 'strategy': 'tensorflow', 'pytorch', 'horovod'.
    detected_resources (dict): A dictionary containing the detected resources. 
                        Expected keys: 'logical_cores', 'gpu_devices'.
    devices (str): The type of devices to use. Options: 'cpu', 'gpu', 'all'.

Returns:
    Distributed ScaleML training strategy appropriate for the model.
"""

# Check if detected resources are provided, if not raise an error.
if detected_resources is None:
    raise ValueError("Resources must be provided. Use the 'resources()' function to detect resources.")
    
# Extract logical cores and GPU devices from detected resources.
logical_cores = [i for i, resource in enumerate(resources) if 'cpu' in resource]
gpu_devices = [i for i, resource in enumerate(resources) if 'gpu' in resource]
if devices in ['gpu', 'all'] and not gpu_devices:
    print("No GPUs exist. Only using CPU resources.")
    devices = 'cpu'

# extract framework assignments
framework_model = framework.get('model')
framework_strategy = framework.get('strategy')


# Create a istributed strategy depending on the input framework
if framework_strategy.lower() == 'tensorflow':
    
    if framework_model.lower() != 'tensorflow':
        raise ValueError("Incompatible model for TensorFlow strategy. Please use a TensorFlow model.")
    else:
        import tensorflow as tf
        
        # TensorFlow strategy setup
        if devices == 'all':
            strategy = tf.distribute.MirroredStrategy()
        elif devices == 'gpu' and gpu_devices:
            strategy = tf.distribute.MirroredStrategy(devices=[f"/gpu:{i}" for i in range(len(gpu_devices))])
        else:
            strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
        print(f"TensorFlow strategy initialized: {strategy}")
        return strategy

elif framework_strategy.lower() == 'pytorch':
    
    if framework_model.lower() != 'pytorch':
        raise ValueError("Incompatible model for PyTorch strategy. Please use a PyTorch model.")
    else:
        import torch
        
        # PyTorch device setup
        if devices == 'all':
            strategy = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        elif devices == 'gpu' and gpu_devices:
            strategy = torch.device("cuda")
        else:
            strategy = torch.device("cpu")
        print(f"PyTorch device: {strategy}")
        return strategy

elif framework_strategy.lower() == 'horovod':
    
    import horovod.tensorflow as hvd_tf
    import horovod.torch as hvd_torch
    
    if framework_model.lower() == 'tensorflow':
        hvd_tf.init()
        print(f"Horovod TensorFlow rank: {hvd_tf.rank()}, size: {hvd_tf.size()}")
        
        if gpu_devices:
            import tensorflow as tf
            tf.config.experimental.set_visible_devices(gpu_devices[hvd_tf.local_rank()], 'GPU')
        return hvd_tf
    
    elif framework_model.lower() == 'pytorch':
        hvd_torch.init()
        print(f"Horovod PyTorch rank: {hvd_torch.rank()}, size: {hvd_torch.size()}")
        
        if gpu_devices:
            import torch
            torch.cuda.set_device(hvd_torch.local_rank())
        return hvd_torch

else:
    raise ValueError("Unsupported framework or model. Choose from 'tensorflow', 'pytorch', or 'horovod'.")

Load MNIST dataset for example

In [0]:
import tensorflow as tf

# MNIST as example model and dataset (you can replace with your actual dataset/model)
mnist = tf.keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()

# Normalize the images to a range of 0 to 1
train_images, test_images = train_images / 255.0, test_images / 255.0

# Flatten the images
train_images = train_images.reshape(-1, 784)
test_images = test_images.reshape(-1, 784)

train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).batch(64)

display(train_images, train_labels, test_images, test_labels)

Train model

In [0]:
scaleml.distributed_train(detected_resources, framework, strategy, train_dataset, log_resources=True)

In [0]:
def distributed_train(detected_resources, framework, strategy, train_dataset, log_resources=True):
    """
    Train a model using a ScaleML strategy and track resource usage (optional).
    
    Parameters:
    - detected_resources (dict): A dictionary containing the detected resources. 
                         Expected keys: 'logical_cores', 'gpu_devices'.
    - framework (dict): A dictionary containing the framework details.
                         Expected keys: 'model', 'strategy'.
                         Options for 'model': 'tensorflow', 'pytorch'.
                         Options for 'strategy': 'tensorflow', 'pytorch', 'horovod'.
    - strategy: ScaleML strategy for distributed training.
    - train_dataset: Dataset used for training.
    - log_resources: Boolean flag to enable/disable resource logging.
    """
    import tensorflow as tf
    import torch
    import horovod.tensorflow as hvd_tf
    import horovod.torch as hvd_torch

    # log resource use throughout training
    if log_resources:
        if not detected_resources:
            raise ValueError("Please include output from resources() to log resources")
        
        # Start logging resources in the background (you can adjust the interval as needed)
        log_resource_usage(detected_resources, interval=10)
    
    # Detect size of the first image from train_dataset
    for images, labels in train_dataset.take(1):
        input_shape = images.shape[1:]
        break

    # create model based on the detected framework
    model = create_model(framework, input_shape)
    
    # Start the training process with or without resource monitoring
    if isinstance(strategy, tf.distribute.Strategy):

        with strategy.scope():
            history = model.fit(train_dataset, epochs=20)
            return history

    elif isinstance(strategy, torch.device):
        model.to(strategy)
        history = train_pytorch_model(model, train_dataset, epochs=20)
        return history

    elif 'horovod' in str(type(strategy)).lower():
        
        if 'tensorflow' in str(type(strategy)).lower():
            history = model.fit(train_dataset, epochs=20)
            return history
        elif 'torch' in str(type(strategy)).lower():
            model.to(strategy)
        history = train_pytorch_model(model, train_dataset, epochs=20)
        return history

    if log_resources:
        # Stop the logging once training is complete
        stop_event.set()
    
    print("Training completed.")

In [0]:
def strategies(framework, detected_resources, devices):
    """
    Sets up a distributed training strategy based on the chosen framework, available resources, and specified devices.

    Args:
        framework (dict): A dictionary containing the framework details.
                         Expected keys: 'model', 'strategy'.
                         Options for 'model': 'tensorflow', 'pytorch'.
                         Options for 'strategy': 'tensorflow', 'pytorch', 'horovod'.
        detected_resources (dict): A dictionary containing the detected resources. 
                         Expected keys: 'logical_cores', 'gpu_devices'.
        devices (str): The type of devices to use. Options: 'cpu', 'gpu', 'all'.

    Returns:
        Distributed ScaleML training strategy appropriate for the model.
    """
    
    # Check if detected resources are provided, if not raise an error.
    if detected_resources is None:
        raise ValueError("Resources must be provided. Use the 'resources()' function to detect resources.")
        
    # Extract logical cores and GPU devices from detected resources.
    logical_cores = detected_resources.get('logical_cores', 0)
    gpu_devices = detected_resources.get('gpu_devices', [])
    if devices == 'gpu' and not gpu_devices:
        print("No GPUs exist. Only using CPU resources.")
        devices = 'cpu'

    # extract framework assignments
    framework_model = framework.get('model')
    framework_strategy = framework.get('strategy')
    

    # Create a istributed strategy depending on the input framework
    if framework_strategy.lower() == 'tensorflow':
        
        if framework_model.lower() != 'tensorflow':
            raise ValueError("Incompatible model for TensorFlow strategy. Please use a TensorFlow model.")
        else:
            import tensorflow as tf
            
            # TensorFlow strategy setup
            if devices == 'all':
                strategy = tf.distribute.MirroredStrategy()
            elif devices == 'gpu' and gpu_devices:
                strategy = tf.distribute.MirroredStrategy(devices=[f"/gpu:{i}" for i in range(len(gpu_devices))])
            else:
                strategy = tf.distribute.OneDeviceStrategy(device="/cpu:0")
            print(f"TensorFlow strategy initialized: {strategy}")
            return strategy

    elif framework_strategy.lower() == 'pytorch':
        
        if framework_model.lower() != 'pytorch':
            raise ValueError("Incompatible model for PyTorch strategy. Please use a PyTorch model.")
        else:
            import torch
            
            # PyTorch device setup
            if devices == 'all':
                strategy = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            elif devices == 'gpu' and gpu_devices:
                strategy = torch.device("cuda")
            else:
                strategy = torch.device("cpu")
            print(f"PyTorch device: {strategy}")
            return strategy

    elif framework_strategy.lower() == 'horovod':
        
        import horovod.tensorflow as hvd_tf
        import horovod.torch as hvd_torch
        
        if framework_model.lower() == 'tensorflow':
            hvd_tf.init()
            print(f"Horovod TensorFlow rank: {hvd_tf.rank()}, size: {hvd_tf.size()}")
            
            if gpu_devices:
                import tensorflow as tf
                tf.config.experimental.set_visible_devices(gpu_devices[hvd_tf.local_rank()], 'GPU')
            return hvd_tf
        
        elif framework_model.lower() == 'pytorch':
            hvd_torch.init()
            print(f"Horovod PyTorch rank: {hvd_torch.rank()}, size: {hvd_torch.size()}")
            
            if gpu_devices:
                import torch
                torch.cuda.set_device(hvd_torch.local_rank())
            return hvd_torch

    else:
        raise ValueError("Unsupported framework or model. Choose from 'tensorflow', 'pytorch', or 'horovod'.")