<a href="https://colab.research.google.com/github/ShaswataJash/Ray/blob/master/Ray_progam_on_Google_Cloud_Platform.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**INTRODUCTION: The notebook demonstrates how to create ray-cluster (https://docs.ray.io/en/latest/installation.html) on Google Cloud Platform (GCP) and then run a ray-program remotely on it from google-colab notebook.**

In [None]:
!whoami
!pwd
!python -V

**Install ray library - current notebook is tested against 2.2.0 version of ray**

In [None]:
!pip install ray==2.2.0

**Run following cell only during development in local node. If you run following cell, it will create a local ray-cluster with single node. The single node acts as head-node of the cluster. Develop your ray-program using this local cluster. Once you have successfully developed the ray-program, then save that ray-program as python (.py) file which can be later submitted to a remote ray-cluster running in Google Cloud Platform (GCP) compute cluster.**

In [None]:
!ray stop
!rm -Rf /tmp/ray
!ray start --head --port=6379 --object-manager-port=8076 --include-dashboard=True --disable-usage-stats --verbose &

**Create the following directory in local node which will be used to write final output from ray-program. When you are developing the ray-program locally, write the output to this directory present in local node. We will be mapping the same directoy path even in remote ray-cluster so that there is no need to change the ray-program whether it is running locally or remotely.**

In [None]:
!mkdir /tmp/ray_local_directory_mount

**Note the %%writefile magic code of jupyter notebook used in the following cell. When you are developing the ray-program locally comment out %%writefile as** 

`#%%writefile ray_test.py`

**Once the ray-program is developed successfully in local node, write the content of the following cell as python(.py) file which can be now submitted to remote ray cluster.**


In [None]:
#test program taken from https://towardsdatascience.com/how-to-scale-python-on-every-major-cloud-provider-5e5df3e88274

%%writefile ray_test.py

from collections import Counter
import socket
import os
import time
import ray

ray.shutdown()
ray.init(address='auto', ignore_reinit_error=True)

print('''This cluster consists of {} nodes in total {} CPU resources in total '''.format(len(ray.nodes()), ray.cluster_resources()['CPU']))

@ray.remote(num_cpus=0.5)
def f():
    time.sleep(0.001)
    # Return IP address.
    return socket.gethostbyname(socket.gethostname()), os.getpid() 

object_ids = [f.remote() for _ in range(1000000)]
ipaddresses_pids = ray.get(object_ids)

with open('/tmp/ray_local_directory_mount/task_stat.txt', 'w') as writer:
    print('Tasks executed')
    for (ip_address, pid), num_tasks in Counter(ipaddresses_pids).items():
        rowToWrite = '{} tasks on {}:pid{}\n'.format(num_tasks, ip_address, pid)
        print(rowToWrite)
        writer.write(rowToWrite)

ray.shutdown()

**Run following cell only during development in local node. It will show the content written by ray-program and then close the local ray-cluster (a single node cluster with only head-node).**

In [None]:
!cat /tmp/ray_local_directory_mount/task_stat.txt
!ray stop -v

**First install google-api-python-client and cryptography library for ray to setup remote cluster in GCP.** 

In [None]:
!pip install google-api-python-client==2.75.0
!pip install cryptography==39.0.0 #needed internally by ray up

In [None]:
#refer: https://cloud.google.com/iam/docs/service-accounts (Go to 'User-managed service accounts' section)
### create a new user-managed service account for ray-experiment from 'IAM & Admin' > 'Service Account') 
### Give only 'Editor' role for the GCP project (it is good practise not to give 'owner' access to the project)
#Refer: https://cloud.google.com/iam/docs/creating-managing-service-account-keys for how to generate the service-account json file
#Rename that file as 'gcp_service_account.json' and then upload to google colab using 'upload to session storage' icon
import os
import json
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/content/gcp_service_account.json"

with open(os.environ["GOOGLE_APPLICATION_CREDENTIALS"], 'r') as gcp_service_acc_file:
    data = json.load(gcp_service_acc_file)
    os.environ["CLOUDSDK_CORE_PROJECT"] = data['project_id']

refer: https://cloud.google.com/compute/docs/images#gcloud

In [None]:
!gcloud auth login

In [None]:
!gcloud compute images list --format="csv(diskSizeGb,archiveSizeBytes,NAME,PROJECT,FAMILY)" > compute_images_list.csv

In [None]:
import pandas as pd
compute_images_list_df = pd.read_csv('compute_images_list.csv')
compute_images_list_df.sort_values(by=['archive_size_bytes'], inplace=True)
pd.set_option('display.max_rows', compute_images_list_df.shape[0]+1)
compute_images_list_df

In [None]:
#!gcloud compute images list --project deeplearning-platform-release --no-standard-images --format="table(diskSizeGb,NAME,PROJECT,FAMILY)" 
!gcloud compute images list --project deeplearning-platform-release --no-standard-images --format="csv(diskSizeGb,archiveSizeBytes,NAME,PROJECT,FAMILY)" > deeplearning_platform_release_image_list.csv

In [None]:
import pandas as pd
deeplearning_platform_release_image_list_df = pd.read_csv('deeplearning_platform_release_image_list.csv')
deeplearning_platform_release_image_list_df.sort_values(by=['archive_size_bytes'], inplace=True)
pd.set_option('display.max_rows', deeplearning_platform_release_image_list_df.shape[0]+1)
deeplearning_platform_release_image_list_df

**Following yaml file is the overall configuration file which will define topology of the remote ray-cluster. Note the {{GCP_PROJECT_ID}}, which will be later replaced by your GCP-project-id extracted from your service-account key file (in JSON form). Note that total number of CPUs that you can allocate will be driven by quota-limit in your GCP account. For an example, a free-trial account may not able to allocate more than 8 CPUs. Below I have considered you have enabled billing in your GCP account. In a billable account, GCP allows default 24 CPUs to be allocated in every region. Considering that, the following configuration will create 1 head node of 4 CPUs and 10 additional worker nodes of 2 CPUs.**

In [None]:
#refer: https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/gcp/example-full.yaml

%%writefile ray_gcp_cluster.yaml

# An unique identifier for the head node and workers of this cluster.
cluster_name: "ray-exp1"

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 10

# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 10

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
#upscaling_speed: 1.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
  #image: "rayproject/ray-ml:latest-cpu" # You can change this to latest-gpu if you require gpu support
  image: rayproject/ray:latest-cpu   # use this one if you don't need ML dependencies, it's faster to pull
  container_name: "ray_container"
  # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
  # if no cached version is present.
  pull_before_run: False
  run_options:  # Extra options to pass into "docker run"
    - --ulimit nofile=65536:65536

  # Example of running a GPU head with CPU workers
  # head_image: "rayproject/ray-ml:latest-gpu"
  # Allow Ray to automatically detect GPUs

  # worker_image: "rayproject/ray-ml:latest-cpu"
  # worker_run_options: []

# If a node is idle for this many minutes, it will be removed.
#idle_timeout_minutes: 2

# Cloud-provider specific configuration.
provider:
    type: gcp
    region: us-central1
    availability_zone: us-central1-a
    project_id: {{GCP_PROJECT_ID}} # Globally unique project id <USER HAD TO USE THEIR OWN GCP PROJECT-ID>

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: ray_user
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below. This requires that you have added the key into the
# project wide meta-data.
#    ssh_private_key: /path/to/your/key.pem

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
    ray_head_default:
        # The resources provided by this node type.
        resources: {"CPU": 4}
        # Provider-specific config for the head node, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as subnets and ssh-keys.
        # For more documentation on available fields, see:
        # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
        node_config:
            machineType: n1-standard-4
            disks:
              - boot: true
                autoDelete: true
                type: PERSISTENT
                initializeParams:
                  diskSizeGb: 10
                  # See https://cloud.google.com/compute/docs/images for more images (choose only thos images which have rsync installed)
                  #sourceImage: projects/deeplearning-platform-release/global/images/family/common-cpu
                  sourceImage: projects/ubuntu-os-cloud/global/images/family/ubuntu-2210-amd64

            # Additional options can be found in in the compute docs at
            # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert

            # If the network interface is specified as below in both head and worker
            # nodes, the manual network config is used.  Otherwise an existing subnet is
            # used.  To use a shared subnet, ask the subnet owner to grant permission
            # for 'compute.subnetworks.use' to the ray autoscaler account...
            # networkInterfaces:
            #   - kind: compute#networkInterface
            #     subnetwork: path/to/subnet
            #     aliasIpRanges: []
    ray_worker_small:
        # The minimum number of worker nodes of this type to launch.
        # This number should be >= 0.
        min_workers: 10
        # The maximum number of worker nodes of this type to launch.
        # This takes precedence over min_workers. (This should match with max_workers field kept on the top.)
        max_workers: 10
        # The resources provided by this node type.
        resources: {"CPU": 2}
        # Provider-specific config for the head node, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as subnets and ssh-keys.
        # For more documentation on available fields, see:
        # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert
        node_config:
            machineType: n1-standard-2
            disks:
              - boot: true
                autoDelete: true
                type: PERSISTENT
                initializeParams:
                  diskSizeGb: 10
                  # See https://cloud.google.com/compute/docs/images for more images (choose only thos images which have rsync installed)
                  #sourceImage: projects/deeplearning-platform-release/global/images/family/common-cpu
                  sourceImage: projects/ubuntu-os-cloud/global/images/family/ubuntu-2210-amd64
            # Run workers on preemtible instance by default.
            # Comment this out to use on-demand.
            scheduling:
              - preemptible: true
            # Un-Comment this to launch workers with the Service Account of the Head Node
            # serviceAccounts:
            # - email: ray-autoscaler-sa-v1@<project_id>.iam.gserviceaccount.com
            #   scopes:
            #   - https://www.googleapis.com/auth/cloud-platform

    # Additional options can be found in in the compute docs at
    # https://cloud.google.com/compute/docs/reference/rest/v1/instances/insert

# Specify the node type of the head node (as configured above).
head_node_type: ray_head_default

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands:
    # Wait for auto upgrade that might run in the background (refer #refer https://itsfoss.com/could-not-get-lock-error/ and https://github.com/ray-project/ray/issues/15893)
    - bash -c $'ps -e | grep apt | awk \'{print $1}\' | xargs tail -f --pid || true' 
    #get the OS info
    - uname -a
    - cat /etc/*-release
    - lsb_release -a
    #get the logged in user related info
    - id
    - w
    #disable ipv6 (https://itsfoss.com/disable-ipv6-ubuntu-linux/)
    - >-
      sudo sysctl -w net.ipv6.conf.all.disable_ipv6=1;
      sudo sysctl -w net.ipv6.conf.default.disable_ipv6=1;
      sudo sysctl -w net.ipv6.conf.lo.disable_ipv6=1
    #firewall is not enabled by default in ubuntu (list out iptable rules to verify that)
    - sudo iptables -L
    #we have to install docker for images coming from 'ubuntu-os-cloud' project (but for 'deeplearning-platform-release' docker is pre-installed in the image) 
    #But note 'ubuntu-os-cloud' images requires just 10GB of space whereas 'deeplearning-platform-release' requires minimum 30GB of space
    - sudo apt-get update
    - sudo apt -y install docker.io
    - sudo snap install docker
    - sudo usermod -a -G docker ray_user
    # refer PaulFenton's comments in https://github.com/ray-project/ray/issues/15893
    #- sudo rm /var/run/docker.pid
    #refer https://stackoverflow.com/questions/43537790/docker-fails-to-start-due-to-volume-store-metadata-database-timeout
    - ps axf | grep docker | grep -v grep | awk '{print "kill -9 " $1}' | sudo sh
    - sudo dockerd --max-download-attempts 10 &
    #TODO: check dockerd has started successfully

# List of shell commands to run to set up nodes.
setup_commands: []    

# Custom commands that will be run on the head node after common setup.
head_setup_commands:
    - mkdir /tmp/ray_local_directory_mount
    - python --version
    # rayproject docker images have python, google-api-client and cryptography packages installed, so no need to install them in head nodes
    #- pip install google-api-python-client==2.75.0
    #- pip install --upgrade --force-reinstall cryptography==39.0.0

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this. (without webui set to 0.0.0.0, by default it binds to 127.0.0.1)
head_start_ray_commands:
    - ray stop
    - >-
      ray start
      --head
      --port=6379
      --object-manager-port=8076
      --include-dashboard=True --dashboard-host=0.0.0.0
      --disable-usage-stats --verbose
      --autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - >-
      ray start
      --address=$RAY_HEAD_IP:6379
      --object-manager-port=8076
      --disable-usage-stats --verbose

In [None]:
import json

with open(os.environ["GOOGLE_APPLICATION_CREDENTIALS"], 'r') as gcp_service_acc_file:
    data = json.load(gcp_service_acc_file)
    with open('ray_gcp_cluster.yaml', 'r') as ray_cluster_config_file:
        config = ray_cluster_config_file.read().replace('{{GCP_PROJECT_ID}}', data['project_id'])
    with open('ray_gcp_cluster.yaml', 'w') as ray_cluster_config_file:
        ray_cluster_config_file.write(config)

In [None]:
!cat ray_gcp_cluster.yaml

In [None]:
!rm -rf /tmp/*
!mkdir -p /tmp/ray_local_directory_mount
!echo "Content of /tmp"
!ls -la /tmp
!rm -rf /root/.ssh/*
!mkdir -p /root/.ssh
!echo "Content of /root"
!ls -la /root
!echo "Content of /root/.ssh"
!ls -la /root/.ssh

In [None]:
!ray disable-usage-stats



```
New status: update-failed
!!!
 {'message': 'SSH command failed.'}
 SSH command failed.
!!!
```
Repeatedly try running 'ray up' command till all required number of worker nodes are ready




In [None]:
!ray up --help

In [None]:
!ray up --yes --verbose ray_gcp_cluster.yaml

In [None]:
!ray exec /content/ray_gcp_cluster.yaml 'tail -n 100 -f /tmp/ray/session_latest/logs/monitor*'

In [None]:
!nohup ray monitor --lines 5 ray_gcp_cluster.yaml &

In [None]:
!ray submit --help

In [None]:
!ray submit ray_gcp_cluster.yaml ray_test.py

In [None]:
!ray rsync-down --verbose ray_gcp_cluster.yaml /tmp/ray_local_directory_mount/task_stat.txt /tmp/ray_local_directory_mount

In [None]:
!cat /tmp/ray_local_directory_mount/task_stat.txt

In [None]:
!ray down --yes --verbose ray_gcp_cluster.yaml