## Serial Python

In [1]:
import math
import random
import time

def sample(num_samples):
    num_inside = 0
    for _ in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1
    return num_inside

def approximate_pi(num_samples):
    start = time.time()
    num_inside = sample(num_samples)
    
    print("pi ~= {}".format((4*num_inside)/num_samples))
    print("Finished in: {:.2f}s".format(time.time()-start))

In [5]:
approximate_pi(100000000)

pi ~= 3.14165028
Finished in: 97.46s


## Parallel on one machine using multiprocessing.Pool

In [6]:
import math
import random
import time

def sample(num_samples):
    num_inside = 0
    for _ in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1
    return num_inside

def approximate_pi_parallel(num_samples):
    from multiprocessing.pool import Pool
    pool = Pool()
    
    start = time.time()
    num_inside = 0
    sample_batch_size = 100000
    for result in pool.map(sample, [sample_batch_size for _ in range(num_samples//sample_batch_size)]):
        num_inside += result
        
    print("pi ~= {}".format((4*num_inside)/num_samples))
    print("Finished in: {:.2f}s".format(time.time()-start))

In [8]:
approximate_pi_parallel(100000000)

pi ~= 3.14178832
Finished in: 30.23s


## Distributed on a 10-node cluster using Ray

In [2]:
!pip3 install ray
!pip3 install psutil
!pip3 install setproctitle

Collecting ray
  Downloading ray-0.8.6-cp37-cp37m-macosx_10_13_intel.whl (53.4 MB)
[K     |████████████████████████████████| 53.4 MB 7.4 MB/s eta 0:00:011
Collecting click>=7.0
  Using cached click-7.1.2-py2.py3-none-any.whl (82 kB)
Collecting filelock
  Using cached filelock-3.0.12-py3-none-any.whl (7.6 kB)
Collecting msgpack<2.0.0,>=0.6.0
  Using cached msgpack-1.0.0-cp37-cp37m-macosx_10_13_x86_64.whl (78 kB)
Collecting google
  Downloading google-3.0.0-py2.py3-none-any.whl (45 kB)
[K     |████████████████████████████████| 45 kB 4.1 MB/s eta 0:00:011
Collecting redis<3.5.0,>=3.3.2
  Using cached redis-3.4.1-py2.py3-none-any.whl (71 kB)
Collecting py-spy>=0.2.0
  Downloading py_spy-0.3.3-py2.py3-none-macosx_10_7_x86_64.whl (1.5 MB)
[K     |████████████████████████████████| 1.5 MB 18.0 MB/s eta 0:00:01
[?25hCollecting aiohttp
  Downloading aiohttp-3.6.2-cp37-cp37m-macosx_10_13_x86_64.whl (642 kB)
[K     |████████████████████████████████| 642 kB 9.2 MB/s eta 0:00:01
[?25hCollectin

In [4]:
import time

import ray

# ray.init() のように明示的に指定しなかった場合自動的にリソース数が決定されます
ray.init(webui_host='127.0.0.1')

# 時間計測をより正確にする都合上Rayの起動を少し待つ
time.sleep(1)

2020-07-13 11:07:25,167	INFO resource_spec.py:212 -- Starting Ray with 4.74 GiB memory available for workers and up to 2.39 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-07-13 11:07:26,368	INFO services.py:1165 -- View the Ray dashboard at [1m[32m127.0.0.1:8265[39m[22m


In [5]:
@ray.remote
def func(x):
    time.sleep(3)
    return x

begin_time = time.time()
res1, res2 = func.remote(1), func.remote(2)
print(res1) # 出力例: ObjectID(45b9....) 
print(ray.get(res1), ray.get(res2)) # 出力: 1 2
# ray.getはリストを受けとることもできる
print(ray.get([res1, res2])) # 出力: [1, 2]
end_time = time.time()
print(end_time - begin_time) # 3秒ぐらい

ObjectID(45b95b1c8bd3a9c4ffffffff010000c001000000)
1 2
[1, 2]
3.0192511081695557


In [6]:
import math
import random
import time


@ray.remote
def sample(num_samples):
    num_inside = 0
    for _ in range(num_samples):
        x, y = random.uniform(-1, 1), random.uniform(-1, 1)
        if math.hypot(x, y) <= 1:
            num_inside += 1
    return num_inside

def approximate_pi_distributed(num_samples):
    from ray.util.multiprocessing.pool import Pool # NOTE: Only the import statement is changed.
    pool = Pool()
        
    start = time.time()
    num_inside = 0
    sample_batch_size = 100000
    for result in [ sample.remote(sample_batch_size) for _ in range(num_samples//sample_batch_size)]:
        print(result)
        print(ray.get(result))
#         num_inside += result
        
    print("pi ~= {}".format((4*num_inside)/num_samples))
    print("Finished in: {:.2f}s".format(time.time()-start))


In [None]:
approximate_pi_distributed(100_000_000)

ObjectID(b944ee5bb38dd1a5ffffffff010000c001000000)


2020-07-13 11:08:07,238	INFO (unknown file):0 -- gc.collect() freed 60 refs in 0.0715512270000005 seconds
