# PyTorch distributed data processing and training with @ray.remote

***

## Prerequisites

In [None]:
%pip install -r ./scripts/requirements.txt --upgrade

In [None]:
# Copy Ray launcher script to the scripts directory. 
%cp ../../../scripts/launcher.py ./scripts/

***

# Dataset

The data set (The Social Dilemma Tweets - Text Classification 2020) was downloaded from [Kaggle](https://www.kaggle.com/datasets/kaushiksuresh147/the-social-dilemma-tweets).
This dataset brings you the twitter responses made with the #TheSocialDilemma hashtag after watching the eye-opening documentary "The Social Dilemma" released in an OTT platform(Netflix) on September 9th, 2020.
The dataset was extracted using TwitterAPI, consisting of nearly 10,526 tweets from twitter users all over the globe!

We'd like to train a model based on the content of the text in order to determine the sentiment.

This is a multi-class classification problem:
* Negative - 0
* Neutral - 1
* Positive - 2

In [None]:
! rm -rf ./data && mkdir -p ./data
! curl https://sagemaker-sample-files.s3.amazonaws.com/datasets/tabular/tweets_dataset/TheSocialDilemma.csv -o ./data/data.csv

# Step 1 - Import Modules

Here we’ll import some libraries and define some variables.

In [None]:
import os

# os.environ["AWS_PROFILE"] = "<aws_profile>"

In [None]:
import boto3
import sagemaker

In [None]:
sagemaker_client = boto3.client("sagemaker")
s3_client = boto3.client("s3")

Create a SageMaker Session and save the default region and the execution role in some Python variables

In [None]:
sagemaker_session = sagemaker.Session()
region = boto3.session.Session().region_name
role = sagemaker.get_execution_role()

In [None]:
bucket_name = sagemaker_session.default_bucket()

## Upload the dataset in the default Amazon S3 Bucket

In order to make data available for the SageMaker Processing Job, let's copy the dataset in the default S3 Bucket

In [None]:
# Download the 
# clean the buckets first
s3_client.delete_object(Bucket=bucket_name, Key="e2e-base/data/input")

input_data = sagemaker_session.upload_data('./data/data.csv', key_prefix="e2e-base/data/input")

input_data

***

## (Optional) Copy Prometheus binary

In case you want to avoid Ray to download prometheus, you can copy the binary on S3 and pass as parameter to the Training job

In [None]:
! wget https://github.com/prometheus/prometheus/releases/download/v3.4.2/prometheus-3.4.2.linux-amd64.tar.gz

In [None]:
import boto3
import sagemaker

In [None]:
sagemaker_session = sagemaker.Session()
s3_client = boto3.client('s3')

bucket_name = sagemaker_session.default_bucket()
default_prefix = sagemaker_session.default_bucket_prefix

In [None]:
# save train_dataset to s3 using our SageMaker session
if default_prefix:
    input_path = f"{default_prefix}/datasets/llm-fine-tuning-modeltrainer-sft-ray"
else:
    input_path = f"datasets/llm-fine-tuning-modeltrainer-sft-ray"

prometheus_s3_path = (
    f"s3://{bucket_name}/{input_path}/prometheus/prometheus-3.4.2.linux-amd64.tar.gz"
)

In [None]:
# Save datasets to s3
# We will fine tune only with 20 records due to limited compute resource for the workshop
s3_client.upload_file(
    "./prometheus-3.4.2.linux-amd64.tar.gz",
    bucket_name,
    f"{input_path}/prometheus/prometheus-3.4.2.linux-amd64.tar.gz",
)

print(f"Prometheus binary uploaded to:")
print(prometheus_s3_path)

***

# Step 2 - Run the job

In [None]:
! pygmentize ./scripts/train.py

In [None]:
import sagemaker
from sagemaker.config import load_sagemaker_config

In [None]:
sagemaker_session = sagemaker.Session()

bucket_name = sagemaker_session.default_bucket()
default_prefix = sagemaker_session.default_bucket_prefix
configs = load_sagemaker_config()

In [None]:
from sagemaker.instance_group import InstanceGroup

instance_groups = [
    InstanceGroup(
        instance_group_name="head-instance-group",
        instance_type="ml.t3.large",
        instance_count=1,
    ),
    InstanceGroup(
        instance_group_name="worker-instance-group-1",
        instance_type="ml.g5.xlarge",
        instance_count=1,
    ),
    InstanceGroup(
        instance_group_name="worker-instance-group-2",
        instance_type="ml.t3.xlarge",
        instance_count=1,
    ),
]

instance_groups

In [None]:
image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=sagemaker_session.boto_session.region_name,
    version="2.6.0",
    instance_type=instance_groups[1].instance_type,
    image_scope="training",
)

image_uri

In [None]:
from sagemaker.pytorch import PyTorch

# define Training Job Name
job_name = "train-ray-processing-train"

# define OutputDataConfig path
if default_prefix:
    output_path = f"s3://{bucket_name}/{default_prefix}/{job_name}"
else:
    output_path = f"s3://{bucket_name}/{job_name}"

estimator = PyTorch(
    source_dir="./scripts",
    entry_point="launcher.py",
    output_path=output_path,
    base_job_name=job_name,
    role=role,
    instance_groups=instance_groups,
    image_uri=image_uri,
    environment={
        "wait_shutdown": "300",  # Wait 5 minutes before shutting down the cluster to analyze Ray Dashboard
        "head_instance_group": "head-instance-group",
        "head_num_cpus": "0",
        "head_num_gpus": "0",
        # "launch_prometheus": "true", # enable for local prometheus
        "RAY_PROMETHEUS_HOST": "<PROMETHEUS_HOST>",
        "RAY_GRAFANA_HOST": "<GRAFANA_HOST>",
        "RAY_PROMETHEUS_NAME": "prometheus",
    },
    hyperparameters={
        "entrypoint": "train.py",
        "epochs": 100,
        "learning_rate": 0.001,
        "batch_size": 100,
    },
    enable_remote_debug=True,
)

In [None]:
from sagemaker.inputs import TrainingInput

train_input = TrainingInput(
    s3_data_type="S3Prefix",  # Available Options: S3Prefix | ManifestFile | AugmentedManifestFile
    s3_data=input_data,
    distribution="FullyReplicated",  # Available Options: FullyReplicated | ShardedByS3Key
    instance_groups=[
        "head-instance-group",
        "worker-instance-group-1",
        "worker-instance-group-2",
    ],
)

## Uncomment this lines if you want to provide the prometheus binary

# prometheus_input = TrainingInput(
#     s3_data_type="S3Prefix",  # Available Options: S3Prefix | ManifestFile | AugmentedManifestFile
#     s3_data=prometheus_s3_path,
#     distribution="FullyReplicated",  # Available Options: FullyReplicated | ShardedByS3Key
#     instance_groups=["head-instance-group", "worker-instance-group"],
# )

# Check input channels configured
data = {
    "processing": train_input, 
    # "prometheus": prometheus_input,
}
data

In [None]:
estimator.fit(inputs=data, wait=False)