In [6]:
import ray 
import os

def stop_clark_cluster():
    ray.shutdown()
    os.system('ssh ellen /home/john/venv/bin/ray stop')
    os.system('ssh audrey /home/john/venv/bin/ray stop')
    os.system('ssh edna /home/john/venv/bin/ray stop')
    
def start_clark_cluster(clark_cpus = 4, ellen_cpus = 32, audrey_cpus = 32, edna_cpus = 32, 
                        clark_gpus = 0, ellen_gpus = 2, audrey_gpus = 1, edna_gpus = 1):
    stop_clark_cluster()
    config = ray.init(redis_port=6379, redis_password="gn8GWVrMJ3cSX4", num_cpus = clark_cpus, num_gpus = clark_gpus)
    os.system("ssh ellen ray start --address={} --redis-password='gn8GWVrMJ3cSX4' --num-gpus={} --num-cpus={}".format(config['redis_address'], ellen_gpus, ellen_cpus))
    os.system("ssh edna ray start --address={} --redis-password='gn8GWVrMJ3cSX4' --num-gpus={} --num-cpus={}".format(config['redis_address'], edna_gpus, edna_cpus))
    os.system("ssh audrey ray start --address={} --redis-password='gn8GWVrMJ3cSX4' --num-gpus={} --num-cpus={}".format(config['redis_address'], audrey_gpus, audrey_cpus))
    return config

# distributed_computing module example

In [1]:
import pipecaster as pc
cluster = pc.HeadNode(n_cpus=4, n_gpus=0, object_store_memory='auto')
cluster.connect_remote_node('ellen', ray_dir='/home/john/venv/bin/',
                       n_cpus = 32, n_gpus = 2, object_store_memory='auto')
cluster.connect_remote_node('audrey', ray_dir='/home/john/venv/bin/',
                       n_cpus = 32, n_gpus = 1, object_store_memory='auto')
cluster.connect_remote_node('edna', ray_dir='/home/john/venv/bin/',
                       n_cpus = 32, n_gpus = 1, object_store_memory='auto')

2020-12-07 08:51:57,517	INFO services.py:1092 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [2]:
# wait ~1 second for resources to be update, then you can call
print('resources: {} CPUs and {} GPUS'.format(cluster.count_cpus(), cluster.count_gpus()))

resources: 100.0 CPUs and 4.0 GPUS


In [3]:
cluster.restart()

2020-12-07 08:52:11,617	INFO services.py:1092 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [5]:
# wait ~1 second for resources to be update, then you can call
print('resources: {} CPUs and {} GPUS'.format(cluster.count_cpus(), cluster.count_gpus()))

resources: 100.0 CPUs and 4.0 GPUS


In [1]:
import pipecaster as pc
import pipecaster.ray_backend as rb

distributor = rb.RayDistributor()
distributor.connect_remote_computer(computer_id='ellen',
                           app_path='/usr/venv/bin/',
                           n_cpus='all', n_gpus=0,
                           object_store_memory='auto')
pc.set_distributor(distributor)

2021-01-15 08:41:32,699	INFO services.py:1092 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [2]:
distributor.count_cpus()

4

In [3]:
import ray
ray.nodes()

[{'NodeID': '6ac6e7edda67acb882efaa36f4159ea0ade85b43',
  'Alive': True,
  'NodeManagerAddress': '192.168.1.251',
  'NodeManagerHostname': 'mski1413',
  'NodeManagerPort': 54994,
  'ObjectManagerPort': 51692,
  'ObjectStoreSocketName': '/tmp/ray/session_2021-01-15_08-41-32_121132_4105/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2021-01-15_08-41-32_121132_4105/sockets/raylet',
  'MetricsExportPort': 63806,
  'alive': True,
  'Resources': {'object_store_memory': 32.0,
   'node:192.168.1.251': 1.0,
   'CPU': 4.0,
   'memory': 93.0}}]

# from scratch

In [7]:
import ray
import os

config = ray.init()

2020-12-12 09:02:07,252	INFO services.py:1092 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [8]:
os.system("ssh ellen /home/john/venv/bin/ray start --address={} --num-gpus={} --num-cpus={}".format(config['redis_address'], 2, 32))

0

In [9]:
os.system("ssh audrey /home/john/venv/bin/ray start --address={} --num-gpus={} --num-cpus={}".format(config['redis_address'], 1, 32))

0

In [10]:
os.system("ssh edna /home/john/venv/bin/ray start --address={} --num-gpus={} --num-cpus={}".format(config['redis_address'], 1, 32))

0

# shutdown cluster

In [4]:
import ray
import os
ray.shutdown()
os.system("ssh edna /home/john/venv/bin/ray stop")
os.system("ssh ellen /home/john/venv/bin/ray stop")
os.system("ssh audrey /home/john/venv/bin/ray stop")

256

In [1]:
import numpy as np

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectPercentile
from sklearn.feature_selection import f_classif
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.datasets import make_classification

import pipecaster as pc

X_rand = np.random.rand(500, 30)
X_inf, y = make_classification(n_samples=500, n_features=30, 
                               n_informative=15, class_sep=3, random_state=None)

Xs = [X_rand, X_rand, X_inf, X_rand, X_inf, X_inf]

clf = pc.MultichannelPipeline(n_channels=6)

clf.add_layer(SimpleImputer())
clf.add_layer(StandardScaler())
clf.add_layer(SelectPercentile(percentile=25))
clf.add_layer(5, pc.SelectKBestScores(feature_scorer=f_classif, aggregator=np.mean, k=2)) 
clf.add_layer(5, pc.SelectKBestModels(predictors=KNeighborsClassifier(), k=1), 
              1, LogisticRegression()) 
clf.add_layer(pc.MultichannelPredictor(SVC()))
clf

Unnamed: 0_level_0,layer_0,out_0,layer_1,out_1,layer_2,out_2,layer_3,out_3,layer_4,out_4,layer_5,out_5
channel,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
0,SimpleImputer(),→,StandardScaler(),→,SelectPercentile(),→,SelectKBestScores(),→,SelectKBestModels(),→,MultichannelPredictor(),→
1,SimpleImputer(),→,StandardScaler(),→,SelectPercentile(),→,↓,→,↓,→,↓,→
2,SimpleImputer(),→,StandardScaler(),→,SelectPercentile(),→,↓,→,↓,→,↓,→
3,SimpleImputer(),→,StandardScaler(),→,SelectPercentile(),→,↓,→,↓,→,↓,→
4,SimpleImputer(),→,StandardScaler(),→,SelectPercentile(),→,↓,→,↓,→,↓,→
5,SimpleImputer(),→,StandardScaler(),→,SelectPercentile(),→,→,→,LogisticRegression(),→,↓,→


In [None]:


    transform_method_name: str or None, default=None
        Set the name of the prediction method used when transform or
        fit_transform are called. If None, the method will be selected
        automatically by the precedence defined in the transform_wrapper
        module.
        
    internal_cv: None, int, or callable, default=5
        Set the internal cv training method for predictors.
        If 1: Internal cv training is inactivated.
        If int > 1: StratifiedKFold(n_splits=internal_cv) for classifiers and
            KFold(n_splits=internal_cv) for regressors.
        If None: The default value of 5 is used.
        If callable: Assumes scikit-learn interface like KFold.
                
    n_processes: int or 'max', default=1
        If 1: Run all split computations in a single process.
        If 'max': Run each split in a different process, using all available
            CPUs
        If int > 1: Run each split in a different process, using up to
            n_processes number of CPUs
            
        """
        Parameters
        ----------
        Xs: list of [ndarray.shape(n_samples, n_features) or None]
            List of feature matrix inputs.
        y: list/array of length n_samples, default=None
            Targets for supervised ML.
        fit_params: dict, defualt=None
            Auxiliary parameters to pass to the fit method of the predictor.
        """
    
    
    

notes:
ray 1.0.1.post1 failed to establish cluster
so I upgraded to 1.1


In [3]:
import ray 
import os

def stop_clark_cluster():
    ray.shutdown()
    os.system('ssh ellen /home/john/venv/bin/ray stop')
    os.system('ssh audrey /home/john/venv/bin/ray stop')
    os.system('ssh edna /home/john/venv/bin/ray stop')
    
def start_clark_cluster(clark_cpus = 4, ellen_cpus = 32, audrey_cpus = 32, edna_cpus = 32, 
                        clark_gpus = 0, ellen_gpus = 2, audrey_gpus = 1, edna_gpus = 1):
    stop_clark_cluster()
    config = ray.init(_redis_password="gn8GWVrMJ3cSX4", num_cpus = clark_cpus, num_gpus = clark_gpus)
    os.system("ssh ellen ray start --address={} --redis-password='gn8GWVrMJ3cSX4' --num-gpus={} --num-cpus={}".format(config['redis_address'], ellen_gpus, ellen_cpus))
    os.system("ssh edna ray start --address={} --redis-password='gn8GWVrMJ3cSX4' --num-gpus={} --num-cpus={}".format(config['redis_address'], edna_gpus, edna_cpus))
    os.system("ssh audrey ray start --address={} --redis-password='gn8GWVrMJ3cSX4' --num-gpus={} --num-cpus={}".format(config['redis_address'], audrey_gpus, audrey_cpus))
    return config

In [4]:
# stop_clark_cluster()
start_clark_cluster()

File descriptor limit 256 is too low for production servers and may result in connection errors. At least 8192 is recommended. --- Fix with 'ulimit -n 8192'
2021-01-15 09:23:32,794	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.1.251',
 'raylet_ip_address': '192.168.1.251',
 'redis_address': '192.168.1.251:6379',
 'object_store_address': '/tmp/ray/session_2021-01-15_09-23-32_081742_1075/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2021-01-15_09-23-32_081742_1075/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2021-01-15_09-23-32_081742_1075',
 'metrics_export_port': 57469,
 'node_id': '16fca5251895b123e4a0fed8c0b49c3c70ce7eb6'}

In [5]:
import ray
ray.nodes()

[{'NodeID': '16fca5251895b123e4a0fed8c0b49c3c70ce7eb6',
  'Alive': True,
  'NodeManagerAddress': '192.168.1.251',
  'NodeManagerHostname': 'mski1413',
  'NodeManagerPort': 53090,
  'ObjectManagerPort': 50667,
  'ObjectStoreSocketName': '/tmp/ray/session_2021-01-15_09-23-32_081742_1075/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2021-01-15_09-23-32_081742_1075/sockets/raylet',
  'MetricsExportPort': 57469,
  'alive': True,
  'Resources': {'object_store_memory': 43.0,
   'CPU': 4.0,
   'memory': 127.0,
   'node:192.168.1.251': 1.0}}]

In [7]:
ray.__version__

'1.1.0'

In [1]:
import pipecaster as pc
import pipecaster.ray_backend as rb

distributor = rb.RayDistributor()


In [2]:
distributor.connect_remote_computer(computer_id='ellen',
                           app_path='/home/john/venv/bin/',
                           n_cpus='all', n_gpus=2,
                           object_store_memory='auto')
pc.set_distributor(distributor)

File descriptor limit 256 is too low for production servers and may result in connection errors. At least 8192 is recommended. --- Fix with 'ulimit -n 8192'
2021-01-15 15:46:46,403	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [3]:
distributor.connect_remote_computer(computer_id='audrey',
                           app_path='/home/john/venv/bin/',
                           n_cpus='all', n_gpus=1,
                           object_store_memory='auto')

In [4]:
distributor.connect_remote_computer(computer_id='edna',
                           app_path='/home/john/venv/bin/',
                           n_cpus='all', n_gpus=1,
                           object_store_memory='auto')

In [6]:
distributor.shutdown()

In [7]:
import ray
ray.nodes()

RaySystemError: System error: Ray has not been started yet. You can start Ray with 'ray.init()'.

In [25]:
import ray
import os
ray.shutdown()
os.system("ssh edna /home/john/venv/bin/ray stop")
os.system("ssh ellen /home/john/venv/bin/ray stop")
os.system("ssh audrey /home/john/venv/bin/ray stop")

0

In [2]:
import ray
config = ray.init(_redis_password="gn8GWVrMJ3cSX4", num_cpus = 4, num_gpus = 0)
ray.nodes()

File descriptor limit 256 is too low for production servers and may result in connection errors. At least 8192 is recommended. --- Fix with 'ulimit -n 8192'
2021-01-15 14:21:52,437	INFO services.py:1173 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


[{'NodeID': 'a1cd2a0c02cbabef9841e293a97b743495178279',
  'Alive': True,
  'NodeManagerAddress': '192.168.1.251',
  'NodeManagerHostname': 'mski1413',
  'NodeManagerPort': 54925,
  'ObjectManagerPort': 53715,
  'ObjectStoreSocketName': '/tmp/ray/session_2021-01-15_14-21-51_754117_1871/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2021-01-15_14-21-51_754117_1871/sockets/raylet',
  'MetricsExportPort': 60254,
  'alive': True,
  'Resources': {'object_store_memory': 35.0,
   'CPU': 4.0,
   'node:192.168.1.251': 1.0,
   'memory': 103.0}}]

In [4]:
config
# modprobe: FATAL: Module nvidia not found in directory /lib/modules/5.3.0-46-generic

{'node_ip_address': '192.168.1.251',
 'raylet_ip_address': '192.168.1.251',
 'redis_address': '192.168.1.251:6379',
 'object_store_address': '/tmp/ray/session_2021-01-15_14-21-51_754117_1871/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2021-01-15_14-21-51_754117_1871/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2021-01-15_14-21-51_754117_1871',
 'metrics_export_port': 60254,
 'node_id': 'a1cd2a0c02cbabef9841e293a97b743495178279'}

In [11]:
os.system("ssh ellen /home/john/venv/bin/ray start --address=192.168.1.251:6379 --redis-password='gn8GWVrMJ3cSX4' --num-gpus=2 --num-cpus=32")

32256

In [None]:
"ssh audrey /home/john/venv/bin/ray start --address=192.168.1.251:6379 --redis-password='gn8GWVrMJ3cSX4' --num-cpus=32 --num-gpus=1"

In [None]:
"ssh edna /home/john/venv/bin/ray start --address=192.168.1.251:6379 --redis-password='gn8GWVrMJ3cSX4' --num-cpus=32 --num-gpus=1"

In [7]:
ray.nodes()

[{'NodeID': '04cc5a2809b35106a9eb0de79586ff85fe3e3ed8',
  'Alive': True,
  'NodeManagerAddress': '192.168.1.2',
  'NodeManagerHostname': 'audrey',
  'NodeManagerPort': 59190,
  'ObjectManagerPort': 38021,
  'ObjectStoreSocketName': '/tmp/ray/session_2021-01-15_14-21-51_754117_1871/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2021-01-15_14-21-51_754117_1871/sockets/raylet',
  'MetricsExportPort': 62166,
  'alive': True,
  'Resources': {'object_store_memory': 197.0,
   'GPU': 1.0,
   'CPU': 32.0,
   'node:192.168.1.2': 1.0,
   'memory': 669.0}},
 {'NodeID': 'd3110562e2dc8cf803b6d433a735b3444a4ef70b',
  'Alive': True,
  'NodeManagerAddress': '192.168.1.3',
  'NodeManagerHostname': 'Ellen',
  'NodeManagerPort': 58293,
  'ObjectManagerPort': 37343,
  'ObjectStoreSocketName': '/tmp/ray/session_2021-01-15_14-21-51_754117_1871/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2021-01-15_14-21-51_754117_1871/sockets/raylet',
  'MetricsExportPort': 58391,
  'alive'