# 为什么使用 Ray？
业务背景

* 我们正在以不断增长的速度收集越来越多种类的数据，而在单机上处理这些数据是很困难的。
* 围绕 Python 已经构建了一系列数据处理工具，构建了非常丰富的生态。

因此，我们希望复用已有的丰富的 Python 生态，同时将数据作业拓展为分布式的形式而不需要操心具体的底层实现。

而 Ray 恰好能提供这种能力，甚至可能是最好的选择。

# 什么是 Ray？

Ray 主要有两部分，Ray Core 和 Ray Libraries（在之前版本的官方文档中也称 Ray Air 或 Ray AI Runtime）：

![map-of-ray.svg](images/map-of-ray.svg)

Ray 和 Spark 的创始人 Ion Stoic 在访谈中对此也有过阐述：

从根本上说，Ray Core 是一个 RPC 框架，加上一个 actor 框架，以及一个对象存储，它允许你在不同函数和 actor 之间通过引用高效传递数据。

相较于 Spark，Ray 的层次要低得多。Spark 抽象和隐藏了并行，Ray 揭示和暴露了并行。开玩笑说，如果 Ray 能兑现我希望它能兑现的承诺，有人今天要重新开发一个 Spark，那他应该在 Ray 之上开发 Spark。

但是对于人类来说，编写并行程序会比较困难，串行顺序思考会比较简单，这也是为什么需要 Ray Libraries 的原因，Ray 上的库正是用来抽象和隐藏并行性的。比如你使用 RLlib，你完全不需要知道底层的并行细节，也不需要担心那些细节，相关的强化学习作业就能工作得很好。

# 如何使用 Ray

安装 Ray

In [6]:
! pip install "ray[deault, train, rllib, serve, tune]"
! pip install "transformers==4.30.2"

Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple




Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple


In [2]:
# 初始化 Ray 集群
import ray
ray.init()

2024-01-15 14:04:05,535	INFO worker.py:1715 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


0,1
Python version:,3.9.18
Ray version:,2.9.0
Dashboard:,http://127.0.0.1:8265


In [3]:
# Ray Core API 简单示例
import ray 

import time

database = [
    "Learning", "Ray",
    "Flexible", "Distributed", "Python", "for", "Machine", "Learning"
]

def retrieve(item):
    time.sleep(item / 10.)
    return item, database[item]

def print_runtime(input_data, start_time):
    print(f'Runtime: {time.time() - start_time:.2f} seconds, data:')
    print(*input_data, sep="\n")

# 通过 ray.remote 来装饰函数，后续再通过 .remote 调用该函数
# Ray Core 就会自动将该函数放到 Ray 集群执行，用户可以自由地设置并行度而无需担心底层实现
@ray.remote
def retrieve_task(item):
    return retrieve(item)

start = time.time()
object_references = [
    retrieve_task.remote(item) for item in range(8)
]
data = ray.get(object_references)
print_runtime(data, start)

Runtime: 1.33 seconds, data:
(0, 'Learning')
(1, 'Ray')
(2, 'Flexible')
(3, 'Distributed')
(4, 'Python')
(5, 'for')
(6, 'Machine')
(7, 'Learning')


In [4]:
# Ray Libraries 简单实例，此处使用 Ray Data ，擅长数据的批处理
items = [{"name": str(i), "data": i} for i in range(10000)]
ds = ray.data.from_items(items)
print(ds.show(5))

# 由于 Ray 的版本更新迭代，英文原版提供的 Ray Data 代码在新版本中无法正常执行，此处有些修改
squares = ds.map(lambda x: {"data": x["data"] ** 2})
evens = squares.filter(lambda x: x["data"] % 2 == 0)
evens.count()
evens.show(5)

2024-01-15 14:04:10,822	INFO dataset.py:2488 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2024-01-15 14:04:10,825	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> LimitOperator[limit=5]
2024-01-15 14:04:10,826	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-15 14:04:10,828	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2024-01-15 14:04:11,282	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[Map(<lambda>)->Filter(<lambda>)]
2024-01-15 14:04:11,284	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-15 14:04:11,284	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


{'name': '0', 'data': 0}
{'name': '1', 'data': 1}
{'name': '2', 'data': 2}
{'name': '3', 'data': 3}
{'name': '4', 'data': 4}
None


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

2024-01-15 14:04:13,359	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[Map(<lambda>)->Filter(<lambda>)] -> LimitOperator[limit=5]
2024-01-15 14:04:13,361	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-15 14:04:13,361	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/200 [00:00<?, ?it/s]

{'data': 0}
{'data': 4}
{'data': 16}
{'data': 36}
{'data': 64}


In [5]:
# 关闭 Ray 集群
ray.shutdown()