In [None]:
import os
import ray
import ray.util
import random

In [3]:
from abc import ABC, abstractmethod

class RDDPartition(ABC):
    @abstractmethod
    def generator(self):
        pass

class RDDIndexedPartition(RDDPartition):
    def __init__(self, b, e):
        self.b = b
        self.e = e
    def generator(self):
        return range(self.b, self.e)

class RDDDataPartition(RDDPartition):
    def __init__(self, data):
        self.data = data
    def generator(self):
        d = ray.get(self.data)
        return (e for e in d)

class RDDTransform(ABC):
    @abstractmethod
    def generator(self, g):
        pass        

class RDDMap(RDDTransform):
    def __init__(self, f):
        self.f = f
    def generator(self, g):
        f = self.f
        return (f(e) for e in g)

class RDDFilter(RDDTransform):
    def __init__(self, f):
        self.f = f
    def generator(self, g):
        f = self.f
        return (e for e in g if f(e))

@ray.remote
def rdd_collect_part(part, transforms):
    gen = part.generator()
    for g in transforms:
        gen = g.generator(gen)
    return [e for e in gen]

@ray.remote
def rdd_count_part(part, transforms):
    gen = part.generator()
    for g in transforms:
        gen = g.generator(gen)
    return sum(1 for _ in gen)

@ray.remote
def rdd_reduce_part(part, transforms, z, f):
    gen = part.generator()
    for g in transforms:
        gen = g.generator(gen)
    s = z
    for e in gen:
        s = f(s, e)
    return s
    
class SimpleRayRDD:
    def __init__(self, np=2, indexed=None, data=None, parts=None, transforms=None):
        if parts is not None:
            # if we are building on a previous rdd
            # this mode is typically called internally
            self.parts = parts
            self.transforms = transforms
        elif indexed is not None:
            # basis case, a new indexed RDD
            n = int(indexed)
            np = max(1, min(np, n // 1000))
            s = n // np
            b = 0
            self.transforms = []
            self.parts = []
            while b < n:
                self.parts.append(RDDIndexedPartition(b, min(n, b+s)))
                b += s
        elif data is not None:
            n = len(data)
            np = max(1, min(np, n // 1000))
            s = n // np
            b = 0
            self.transforms = []
            self.parts = []
            while b < n:
                self.parts.append(RDDDataPartition(ray.put([data[b:min(n,b+s)]])[0]))
                b += s
        else:
            raise("undefined RDD partition mode")

    def map(self, f):
        t = self.transforms[:]
        t.append(RDDMap(f))
        return SimpleRayRDD(parts=self.parts[:], transforms=t)

    def filter(self, f):
        t = self.transforms[:]
        t.append(RDDFilter(f))
        return SimpleRayRDD(parts=self.parts[:], transforms=t)

    def collect(self):
        parts = [rdd_collect_part.remote(p, self.transforms) for p in self.parts]
        parts = ray.get(parts)
        data = []
        for p in parts:
            data.extend(p)
        return data

    def count(self):
        counts = [rdd_count_part.remote(p, self.transforms) for p in self.parts]
        return sum(ray.get(counts))

    def reduce(self, z, f):
        ps = ray.get([rdd_reduce_part.remote(p, self.transforms, z, f) for p in self.parts])
        s = z
        for e in ps:
            s = f(s, e)
        return s

In [4]:
from ray.util.client import ray as rayclient
if rayclient.is_connected():
    ray.util.disconnect()

ray.util.connect('{ray_head}:10001'.format(ray_head=os.environ['RAY_CLUSTER']))

The autoscaler failed with the following error:
Terminated with signal 15
  File "/opt/ray/ray-KozB7GFR/bin/ray-operator", line 8, in <module>
    sys.exit(main())
  File "/opt/ray/ray-KozB7GFR/lib/python3.6/site-packages/ray/ray_operator/operator.py", line 154, in main
    handle_event(event_type, cluster_cr, cluster_name)
  File "/opt/ray/ray-KozB7GFR/lib/python3.6/site-packages/ray/ray_operator/operator.py", line 113, in handle_event
    cluster_action(event_type, cluster_cr, cluster_name)
  File "/opt/ray/ray-KozB7GFR/lib/python3.6/site-packages/ray/ray_operator/operator.py", line 127, in cluster_action
    ray_clusters[cluster_name].create_or_update()
  File "/opt/ray/ray-KozB7GFR/lib/python3.6/site-packages/ray/ray_operator/operator.py", line 50, in create_or_update
    self.do_in_subprocess(self._create_or_update)
  File "/opt/ray/ray-KozB7GFR/lib/python3.6/site-packages/ray/ray_operator/operator.py", line 40, in do_in_subprocess
    self.subprocess.start()
  File "/usr/lib64/py

{'num_clients': 1,
 'python_version': '3.6.8',
 'ray_version': '2.0.0.dev0',
 'ray_commit': '4357055305395ea309813851a539381bc7c32138',
 'protocol_version': '2020-02-22'}

In [5]:
rdd = SimpleRayRDD(data=[1,2,3,4], np=1).filter(lambda x: x % 2 == 0).map(lambda x: x + 1)
rdd.collect()

[3, 5]

In [6]:
rdd.count()

2

In [7]:
rdd.reduce(0, lambda x,y: x + y)

8

In [8]:
def ray_pi(n = 1000, k = 10):
    c = SimpleRayRDD(indexed=n*k, np=k) \
        .map(lambda _: (random.uniform(-1,1), random.uniform(-1,1))) \
        .filter(lambda p: p[0]*p[0] + p[1]*p[1] <= 1) \
        .count()
    return 4 * c / (n*k)

In [12]:
%%time
ray_pi(n = 1000000, k = 10)

CPU times: user 42.6 ms, sys: 4.95 ms, total: 47.5 ms
Wall time: 5.31 s


3.14149