# Transfer learning with Huggingface using CodeFlare

In this notebook you will learn how to leverage the **[huggingface](https://huggingface.co/)** support in ray ecosystem to carry out a text classification task using transfer learning. We will be referencing the examples **[here](https://huggingface.co/docs/transformers/tasks/sequence_classification)** and **[here](https://docs.ray.io/en/latest/train/getting-started-transformers.html)**.

The example carries out a text classification task on **[imdb dataset](https://huggingface.co/datasets/imdb)** and tries to classify the movie reviews as positive or negative. Huggingface library provides an easy way to build a model and the dataset to carry out this classification task. In this case we will be using **distilbert-base-uncased** model which is a **BERT** based model.

### Getting all the requirements in place

In [1]:
# Import pieces from codeflare-sdk
from codeflare_sdk import Cluster, ClusterConfiguration, TokenAuthentication

In [2]:
# Create authentication object for user permissions
# IF unused, SDK will automatically check for default kubeconfig, then in-cluster config
# KubeConfigFileAuthentication can also be used to specify kubeconfig path manually
auth = TokenAuthentication(
    token = "XXXX",
    server = "XXXX",
    skip_tls = False
)
auth.login()

'Logged into https://api.akram.3psf.p3.openshiftapps.com:443'

Here, we want to define our cluster by specifying the resources we require for our batch workload. Below, we define our cluster object (which generates a corresponding Ray Cluster).

NOTE: The default images used by the CodeFlare SDK for creating a RayCluster resource depend on the installed Python version:

- For Python 3.9: 'quay.io/modh/ray:2.35.0-py39-cu121'
- For Python 3.11: 'quay.io/modh/ray:2.35.0-py311-cu121'

If you prefer to use a custom Ray image that better suits your needs, you can specify it in the image field to override the default.

In [3]:
# Create our cluster and submit
# The SDK will try to find the name of your default local queue based on the annotation "kueue.x-k8s.io/default-queue": "true" unless you specify the local queue manually below
cluster_name= "hfgputest"
cluster = Cluster(ClusterConfiguration(name=cluster_name, 
                                       head_extended_resource_requests={'nvidia.com/gpu':1}, # For GPU enabled workloads set the head_extended_resource_requests and worker_extended_resource_requests
                                       worker_extended_resource_requests={'nvidia.com/gpu':1},
                                       num_workers=1,
                                       worker_cpu_requests=2, 
                                       worker_cpu_limits=3, 
                                       worker_memory_requests=8, 
                                       worker_memory_limits=8, 
                                       # image="", # Optional Field 
                                       write_to_file=False, # When enabled Ray Cluster yaml files are written to /HOME/.codeflare/resources 
                                       # local_queue="local-queue-name" # Specify the local queue manually
                                       ))

Yaml resources loaded for hfgputest


VBox(children=(HBox(children=(Button(description='Cluster Up', icon='play', style=ButtonStyle(), tooltip='Crea…

Output()

Next, we want to bring our cluster up, so we call the `up()` function below to submit our Ray Cluster onto the queue, and begin the process of obtaining our resource cluster.

In [None]:
cluster.up()

Now, we want to check on the initial status of our resource cluster, then wait until it is finally ready for use.

In [4]:
cluster.status()

(<CodeFlareClusterStatus.READY: 1>, True)

In [None]:
cluster.status()

Let's quickly verify that the specs of the cluster are as expected.

In [5]:
cluster.details()

RayCluster(name='hfgputest', status=<RayClusterStatus.READY: 'ready'>, head_cpu_requests=2, head_cpu_limits=2, head_mem_requests='8G', head_mem_limits='8G', num_workers=1, worker_mem_requests='8G', worker_mem_limits='8G', worker_cpu_requests=2, worker_cpu_limits=3, namespace='akram', dashboard='https://ray-dashboard-hfgputest-akram.apps.rosa.akram.3psf.p3.openshiftapps.com', worker_extended_resources={'nvidia.com/gpu': 1}, head_extended_resources={'nvidia.com/gpu': 1})

In [6]:
ray_cluster_uri = cluster.cluster_uri()

Now we can connect directly to our Ray cluster via the Ray python client:

In [7]:
from codeflare_sdk import generate_cert
# Create required TLS cert and export the environment variables to enable TLS
generate_cert.generate_tls_cert(cluster_name, cluster.config.namespace)
generate_cert.export_env(cluster_name, cluster.config.namespace)

**NOTE**: Now we have our resource cluster with the desired GPUs, so we can interact with it to train the HuggingFace model.

In [8]:
#before proceeding make sure the cluster exists and the uri is not empty
assert ray_cluster_uri, "Ray cluster needs to be started and set before proceeding"

import ray
import os
from ray.train import CheckpointConfig, RunConfig
from ray.train import ScalingConfig

# reset the ray context in case there's already one. 
ray.shutdown()
# establish connection to ray cluster

# install additional libraries that will be required for this training
runtime_env = {"pip": ["transformers==4.41.2", 
                       "datasets==2.17.0", 
                       "accelerate==0.31.0", 
                       "scikit-learn==1.5.0"]}

# NOTE: This will work for in-cluster notebook servers (RHODS/ODH), but not for local machines
# To see how to connect from your laptop, go to demo-notebooks/additional-demos/local_interactive.ipynb
ray.init(address=ray_cluster_uri, runtime_env=runtime_env)

print("Ray cluster is up and running: ", ray.is_initialized())

bucket_name = os.getenv("AWS_S3_BUCKET", "default_bucket_name")  # Use a default value if the env var is not set
storage_path = f"s3://{bucket_name}/data/"
print(os.getenv("AWS_S3_ENDPOINT"))
print("storage_path: ", storage_path)

checkpoint_config = CheckpointConfig(num_to_keep=3)
print("checkpoint_config: ", checkpoint_config)

run_config = ray.train.RunConfig(storage_path=storage_path, 
                                 checkpoint_config=checkpoint_config)
print("run_config: ", run_config)

scaling_config = ScalingConfig(num_workers=2, use_gpu=True)
print("scaling_config: ", scaling_config)


SIGTERM handler is not set because current thread is not the main thread.


Ray cluster is up and running:  True
https://akram-rhoaieng-1990.s3.eu-north-1.amazonaws.com/
storage_path:  s3://akram-rhoaieng-1990/data/
checkpoint_config:  CheckpointConfig(num_to_keep=3)
run_config:  RunConfig(storage_path='s3://akram-rhoaieng-1990/data/', checkpoint_config=CheckpointConfig(num_to_keep=3), verbose=1)
scaling_config:  ScalingConfig(num_workers=2, use_gpu=True)


**NOTE** : in this case since we are running a task for which we need additional pip packages. we can install those by passing them in the `runtime_env` variable

### Transfer learning code from huggingface

We are using the code based on the examples **[here](https://huggingface.co/docs/transformers/tasks/sequence_classification)** and **[here](https://docs.ray.io/en/latest/train/getting-started-transformers.html)**. 

In [9]:
@ray.remote
def train_fn():
    import os
    import numpy as np
    from datasets import load_dataset, load_metric
    import transformers
    from transformers import (
        Trainer,
        TrainingArguments,
        AutoTokenizer,
        AutoModelForSequenceClassification,
    )
    import ray.train.huggingface.transformers
    from ray.train.torch import TorchTrainer

    # When running in a multi-node cluster you will need persistent storage that is accessible across all worker nodes. 
    # See www.github.com/project-codeflare/codeflare-sdk/tree/main/docs/s3-compatible-storage.md for more information.
    
    def train_func():
        # Datasets
        dataset = load_dataset("imdb")
        tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

        def tokenize_function(examples):
            return tokenizer(examples["text"], padding="max_length", truncation=True)

        small_train_dataset = (
            dataset["train"].select(range(100)).map(tokenize_function, batched=True)
        )
        small_eval_dataset = (
            dataset["test"].select(range(100)).map(tokenize_function, batched=True)
        )

        # Model
        model = AutoModelForSequenceClassification.from_pretrained(
            "distilbert-base-uncased", num_labels=2
        )

        def compute_metrics(eval_pred):
            metric = load_metric("accuracy")
            logits, labels = eval_pred
            predictions = np.argmax(logits, axis=-1)
            return metric.compute(predictions=predictions, references=labels)

        # Hugging Face Trainer
        training_args = TrainingArguments(
            output_dir="test_trainer",
            evaluation_strategy="epoch",
            save_strategy="epoch",
            report_to="none",
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=small_train_dataset,
            eval_dataset=small_eval_dataset,
            compute_metrics=compute_metrics,
        )
        callback = ray.train.huggingface.transformers.RayTrainReportCallback()
        trainer.add_callback(callback)
        trainer = ray.train.huggingface.transformers.prepare_trainer(trainer)
        trainer.train()
    ray_trainer = TorchTrainer(
        train_func,
        scaling_config=scaling_config,
        run_config=run_config
        # Configure persistent storage that is accessible across 
        # all worker nodes.
        # Uncomment and update the RunConfig below to include your storage details.
        # run_config=ray.train.RunConfig(storage_path="storage path"),
    )
    result: ray.train.Result = ray_trainer.fit()

**NOTE:** This code will produce a lot of output and will run for **approximately 2 minutes.** As a part of execution it will download the `imdb` dataset, `distilbert-base-uncased` model and then will start transfer learning task for training the model with this dataset. 

In [10]:
#call the above cell as a remote ray function
ray.get(train_fn.remote())

RayTaskError(OSError): [36mray::train_fn()[39m (pid=30872, ip=10.129.2.37)
  File "/tmp/ipykernel_10124/2969632525.py", line 73, in train_fn
  File "/tmp/ray/session_2024-11-15_14-27-34_990179_1/runtime_resources/pip/9722261244c711db4e388426f42e7dd7dd95f293/virtualenv/lib64/python3.11/site-packages/ray/train/base_trainer.py", line 589, in fit
    storage = StorageContext(
              ^^^^^^^^^^^^^^^
  File "/tmp/ray/session_2024-11-15_14-27-34_990179_1/runtime_resources/pip/9722261244c711db4e388426f42e7dd7dd95f293/virtualenv/lib64/python3.11/site-packages/ray/train/_internal/storage.py", line 455, in __init__
    self._create_validation_file()
  File "/tmp/ray/session_2024-11-15_14-27-34_990179_1/runtime_resources/pip/9722261244c711db4e388426f42e7dd7dd95f293/virtualenv/lib64/python3.11/site-packages/ray/train/_internal/storage.py", line 483, in _create_validation_file
    self.storage_filesystem.create_dir(self.experiment_fs_path)
  File "pyarrow/_fs.pyx", line 612, in pyarrow._fs.FileSystem.create_dir
  File "pyarrow/error.pxi", line 92, in pyarrow.lib.check_status
OSError: When testing for existence of bucket 'akram-rhoaieng-1990': AWS Error ACCESS_DENIED during HeadBucket operation: No response body.

Finally, we bring our resource cluster down and release/terminate the associated resources, bringing everything back to the way it was before our cluster was brought up.

In [None]:
cluster.down()

In [None]:
auth.logout()

## Conclusion
As shown in the above example, you can run your Huggingface transfer learning tasks easily and natively on CodeFlare. You can scale them from 1 to n GPUs without requiring you to make any significant code changes and leveraging the native Huggingface trainer. 