In [1]:
import ray

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
def hi():
    import os
    import socket
    return f"Running on {socket.gethostname()} in pid {os.getpid()}"

hi()

'Running on 4688763a1b47 in pid 22128'

In [3]:
@ray.remote
def remote_hi():
    import os
    import socket
    return f"Running on {socket.gethostname()} in pid {os.getpid()}"

future = remote_hi.remote()


2023-07-08 07:44:14,473	INFO worker.py:1627 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


In [4]:
ray.get(future)

'Running on 4688763a1b47 in pid 22285'

In [5]:
import timeit



def slow_task(x):
    import time
    print(x)
    time.sleep(0.5) # Do something sciency/business
    return x

@ray.remote
def remote_task(x):
    xstr = str(x) + "_remote"
    #print(xstr)
    return slow_task(xstr) 


In [6]:
things = range(10)

very_slow_result = map(slow_task, things)
slow_time = timeit.timeit(lambda: list(very_slow_result), number=1)

slowish_result = map(lambda x: remote_task.remote(x), things)
fast_time = timeit.timeit(lambda: list(ray.get(list(slowish_result))), number=1)
print(f"In sequence {slow_time}, in parallel {fast_time}")


0
1
2
3
4
5
6
7
8
9
[2m[36m(remote_task pid=22285)[0m 0_remote
In sequence 5.007652811000298, in parallel 1.1269431339997027


In [7]:
list(very_slow_result)

[]

In [9]:
urls = ray.data.from_items([
    "https://github.com/scalingpythonml/scalingpythonml",
    "https://github.com/ray-project/ray"])

def fetch_page(url):
    import requests
    f = requests.get(url)
    return f.text

pages = urls.map(fetch_page)
# pages.take(1)

In [12]:
# words = pages.flat_map(lambda x: x.split(" ")).map(lambda w: (w, 1))
# grouped_words = words.groupby(lambda wc: wc[0])

In [13]:
@ray.remote
class HelloWorld(object):
    def __init__(self):
        self.value = 0
    def greet(self):
        self.value += 1
        return f"Hi user #{self.value}"

# Make an instance of the actor
hello_actor = HelloWorld.remote()

# Call the actor
print(ray.get(hello_actor.greet.remote()))
print(ray.get(hello_actor.greet.remote()))

Hi user #1
Hi user #2


    [state-dump] 	InternalPubSubGcsService.grpc_client.GcsSubscriberPoll - 7 total (1 active), CPU time: mean = 89.979 us, total = 629.852 us
    [state-dump] 	NodeManagerService.grpc_server.PinObjectIDs - 7 total (0 active), CPU time: mean = 236.199 us, total = 1.653 ms
    [state-dump] 	Subscriber.HandlePublishedMessage_WORKER_OBJECT_LOCATIONS_CHANNEL - 5 total (0 active), CPU time: mean = 3.135 us, total = 15.676 us
    [state-dump] 	Subscriber.HandlePublishedMessage_WORKER_OBJECT_EVICTION - 4 total (0 active), CPU time: mean = 77.478 us, total = 309.910 us
    [state-dump] 	WorkerInfoGcsService.grpc_client.ReportWorkerFailure - 2 total (0 active), CPU time: mean = 17.210 us, total = 34.420 us
    [state-dump] 	Subscriber.HandlePublishedMessage_GCS_WORKER_DELTA_CHANNEL - 2 total (0 active), CPU time: mean = 4.885 us, total = 9.770 us
    [state-dump] 	CoreWorkerService.grpc_client.Exit - 2 total (0 active), CPU time: mean = 12.753 us, total = 25.506 us
    [state-dump] 	InternalPubS