In [1]:
import os
import sys
sys.path.append(os.path.dirname(os.path.realpath('__file__')) + '/../../../')
from acaisdk.file import File
from acaisdk.project import Project
from acaisdk.fileset import FileSet
from acaisdk.job import Job, JobStatus
from acaisdk.meta import *
from acaisdk.utils import utils
from acaisdk import credentials

workspace = os.path.dirname(os.path.realpath('__file__'))
utils.DEBUG = True  # print debug messages. Calls are successful as long as no exception is thrown.

In [72]:
# Make your changes here
project_id = "execution_test"
root_token = 'AWESOME_ACAI_DEVELOPERS'
project_admin = 'execution_eng'
user = 'execution_eng'

In [3]:
# Create project and user
r = Project.create_project(project_id, root_token, project_admin)
r = Project.create_user(project_id, r['project_admin_token'], user)  # the new user is logged in automatically.
# You can take note of the new token
token = r['user_token']
print(token)

Running request: 192.168.1.72 7445 credential create_project
POST data {"project_id": "data_lake_test", "admin_token": "AWESOME_ACAI_DEVELOPERS", "project_admin_name": "data_lake"}
Running request: 192.168.1.72 7445 credential create_user
POST data {"project_id": "data_lake_test", "admin_token": "7FiP3KQc8wBBGgRRzS6zMovzw3fLvwzY", "user_name": "data_lake"}
Logged in with token MgmScLEHtGyBE4hClDca4WiIEcWXbLL5
MgmScLEHtGyBE4hClDca4WiIEcWXbLL5


In [2]:
# Next time you can use the token to login:
credentials.login('qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg')

In [3]:
# Upload code
train_code = os.path.join(workspace, 'example.zip')
# eval_code = os.path.join(workspace, 'demo/eval_script.zip')
File.upload({train_code: 'train_scripts.zip'})

Running request: credential.acai.mxcao.me 80 storage start_file_upload_session
POST data {"paths": ["train_scripts.zip"], "token": "qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg"}
[('/Users/mxin/Desktop/cmu/2020fall/11632-capstone/acai_repos_new/acaisdk/acaisdk/example/ray-example/example.zip', 'train_scripts.zip')]
r =  <Response [200]>
Uploaded /Users/mxin/Desktop/cmu/2020fall/11632-capstone/acai_repos_new/acaisdk/acaisdk/example/ray-example/example.zip to train_scripts.zip
Running request: credential.acai.mxcao.me 80 storage poll_file_upload_session
GET query {"session_id": 21134, "token": "qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg"}
Running request: credential.acai.mxcao.me 80 storage poll_file_upload_session
GET query {"session_id": 21134, "token": "qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg"}
Running request: credential.acai.mxcao.me 80 storage finish_file_upload_session
POST data {"session_id": 21134, "token": "qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg"}


[('/Users/mxin/Desktop/cmu/2020fall/11632-capstone/acai_repos_new/acaisdk/acaisdk/example/ray-example/example.zip',
  'train_scripts.zip:2')]

In [8]:
# Upload dummy input files and create a new file set on the fly
input_dir = os.path.join(workspace, 'dataset/')
File.convert_to_file_mapping([input_dir], 'ray_dataset/')\
    .files_to_upload\
    .upload()\
    .as_new_file_set('ray_test_fs')

Running request: 192.168.1.72 7445 storage start_file_upload_session
POST data {"paths": ["ray_dataset/train.txt"], "token": "MgmScLEHtGyBE4hClDca4WiIEcWXbLL5"}
[('/Users/mxin/Desktop/cmu/2020fall/11632-capstone/acai_repos_new/acaisdk/acaisdk/example/ray-example/dataset/train.txt', 'ray_dataset/train.txt')]
r =  <Response [200]>
Uploaded /Users/mxin/Desktop/cmu/2020fall/11632-capstone/acai_repos_new/acaisdk/acaisdk/example/ray-example/dataset/train.txt to ray_dataset/train.txt
Running request: 192.168.1.72 7445 storage poll_file_upload_session
GET query {"session_id": 65, "token": "MgmScLEHtGyBE4hClDca4WiIEcWXbLL5"}
Running request: 192.168.1.72 7445 storage poll_file_upload_session
GET query {"session_id": 65, "token": "MgmScLEHtGyBE4hClDca4WiIEcWXbLL5"}
Running request: 192.168.1.72 7445 storage poll_file_upload_session
GET query {"session_id": 65, "token": "MgmScLEHtGyBE4hClDca4WiIEcWXbLL5"}
Running request: 192.168.1.72 7445 storage finish_file_upload_session
POST data {"session_id

{'id': 'ray_test_fs:1', 'files': ['ray_dataset/train.txt:1']}

In [3]:
# You can inspect the uploaded files
File.list_dir('/')

Running request: credential.acai.mxcao.me 80 storage list_directory
GET query {"directory_path": "/", "token": "MgmScLEHtGyBE4hClDca4WiIEcWXbLL5"}


[{'path': 'ray_dataset', 'version': -1, 'dir': True, 'is_dir': True}]

In [89]:
# Run a training job, it takes ~3 mins to finish
job_setting = {
    "v_cpu": "100m",
    "memory": "512Mi",
    "gpu": "0",
    "nnode": 3, # the number of workers in addition to the head node
    "framework": "ray",
    "command": "ray start --node-ip-address=$MY_POD_IP --num-cpus=0 --address=$RAY_HEAD_SERVICE_HOST:$RAY_HEAD_SERVICE_PORT_REDIS_PRIMARY --object-manager-port=12345 --node-manager-port=12346 && mkdir ray_output && python example.py | tee ./ray_output/output.txt",
    "container_image": "rayproject/autoscaler",
    'input_file_set': 'ray_test_fs', # not used since this example does not need an input dataset
    'output_path': './ray_output/', # necessary to have a parent folder
    'code': 'train_scripts.zip',
    'description': 'sample ray job with 4 nodes (1 head + 3 workers)',
    'name': 'ray_test_job'
}

train_job = Job().with_attributes(job_setting).run()

Running request: 192.168.1.72 7445 storage resolve_file_set
GET query {"vague_name": "ray_test_fs", "token": "qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg"}
Running request: 192.168.1.72 7445 job_registry new_job
POST data {"name": "ray_test_job", "input_file_set": "ray_test_fs:1", "output_path": "./ray_output/", "code": "train_scripts.zip", "command": "ray start --node-ip-address=$MY_POD_IP --num-cpus=0 --address=$RAY_HEAD_SERVICE_HOST:$RAY_HEAD_SERVICE_PORT_REDIS_PRIMARY --object-manager-port=12345 --node-manager-port=12346 && mkdir ray_output && python example.py | tee ./ray_output/output.txt", "container_image": "rayproject/autoscaler", "description": "sample ray job with 4 nodes (1 head + 3 workers)", "v_cpu": "100m", "gpu": "0", "memory": "512Mi", "job_status": null, "nnode": 3, "framework": "ray", "token": "qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg"}
{'status': {'message': 'launching'}, 'job': {'name': 'ray_test_job', 'code': 'train_scripts.zip', 'command': 'ray start --node-ip-address=$MY_POD_IP --

In [90]:
train_job.status()

Running request: 192.168.1.72 7445 job_monitor job_status
POST data {"ids": [51], "token": "qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg"}


<JobStatus.FINISHED: 6>

In [91]:
# Now inspect the output
File.list_dir('/ray_output')

Running request: 192.168.1.72 7445 storage list_directory
GET query {"directory_path": "/ray_output", "token": "qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg"}


[{'path': 'output.txt', 'version': 3, 'dir': False, 'is_dir': False}]

In [92]:
File.download({'/ray_output/output.txt': './output.txt'})

Running request: 192.168.1.72 7445 storage download_file
GET query {"path": "/ray_output/output.txt", "token": "qjbYDwKhg8l4OzJymKGy2ruHOodiFgsg"}
<Response [200]>
