(ray-data-transform)=
# 数据转换

数据处理的核心在于对数据进行一系列的转换（Transform），本节将介绍：

* 如何对一行、一批次进行转换
* 如何进行分组 `groupby`
* 如何随机洗牌

## 转换

### map() 与 map_batches()

Ray Data 提供了两类数据转换操作，如 {numref}`map-map-batches` 所示：

* 每行数据，可以用 `Dataset.map()` 和 `Dataset.flat_map()` 这两个 API，即对每一行数据一一进行转换。这与其他大数据框架（Spark 或者 Flink）类似。输入一行，输出一行。
* 将多行数据打包为一个批次（Batch），对一个批次的数据进行转换：[`Dataset.map_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html)。输入一个 Batch，输出一个 Batch。

```{figure} ../img/ch-ray-air/map-map-batches.svg
---
width: 800px
name: map-map-batches
---
map() v.s. map_batches()
```

我们仍以纽约出租车数据为例，演示如何使用这两类转换操作。

In [1]:
import os
import shutil
import urllib.request

import ray

if ray.is_initialized:
    ray.shutdown()

ray.init()

folder_path = os.path.join(os.getcwd(), "../data/nyc-taxi")
download_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-06.parquet"
file_name = download_url.split("/")[-1]
parquet_file_path = os.path.join(folder_path, file_name)
if not os.path.exists(folder_path):
    # 创建文件夹
    os.makedirs(folder_path)
    print(f"文件夹 {folder_path} 不存在，已创建。")
    # 下载并保存 Parquet 文件
    with urllib.request.urlopen(download_url) as response, open(parquet_file_path, 'wb') as out_file:
        shutil.copyfileobj(response, out_file)
    print("数据已下载并保存为 Parquet 文件。")
else:
    print(f"文件夹 {folder_path} 已存在，无需操作。")

  from .autonotebook import tqdm as notebook_tqdm
2023-12-14 13:40:10,994	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2023-12-14 13:40:16,312	INFO worker.py:1673 -- Started a local Ray instance.


文件夹 /Users/luweizheng/Projects/py-101/distributed-python/ch-ray-data/../data/nyc-taxi 已存在，无需操作。


读取数据到 `Dataset` 类，先查看原有的数据格式，其中 `tpep_pickup_datetime` 和 `tpep_dropoff_datetime` 分别为乘客上车和下车时间，包含了日期和时间。

In [2]:
dataset = ray.data.read_parquet(parquet_file_path)
dataset.take(1)

2023-12-14 13:40:18,847	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
(pid=64922) Parquet Files Sample 0:   0%|          | 0/1 [00:00<?, ?it/s]2023-12-14 13:40:21,041	INFO dataset.py:2383 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2023-12-14 13:40:21,048	INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2023-12-14 13:40:21,049	INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 200, each read task output is split into 200 smaller blocks.
2023-12-14 13:40:21,050	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
2023-12-14 13:40:21,052	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_li

[{'VendorID': 1,
  'tpep_pickup_datetime': datetime.datetime(2023, 6, 1, 0, 8, 48),
  'tpep_dropoff_datetime': datetime.datetime(2023, 6, 1, 0, 29, 41),
  'passenger_count': 1,
  'trip_distance': 3.4,
  'RatecodeID': 1,
  'store_and_fwd_flag': 'N',
  'PULocationID': 140,
  'DOLocationID': 238,
  'payment_type': 1,
  'fare_amount': 21.9,
  'extra': 3.5,
  'mta_tax': 0.5,
  'tip_amount': 6.7,
  'tolls_amount': 0.0,
  'improvement_surcharge': 1.0,
  'total_amount': 33.6,
  'congestion_surcharge': 2.5,
  'Airport_fee': 0.0}]

使用 `map()` 对这两个字段进行格式化，丢弃日期，只保留24小时制的时间。`map()` 的最重要的参数是一个自定义的函数 `fn`，这个函数对每一行数据进行转换，返回一行。

In [3]:
def format_datetime(row):
    row['tpep_pickup_datetime'] = row['tpep_pickup_datetime'].strftime("%H%M")
    row['tpep_dropoff_datetime'] = row['tpep_dropoff_datetime'].strftime("%H%M")
    return row

dataset = dataset.map(format_datetime)
dataset.take(1)

2023-12-14 13:40:22,158	INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2023-12-14 13:40:22,159	INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 200, each read task output is split into 200 smaller blocks.
2023-12-14 13:40:22,161	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Map(format_datetime)] -> LimitOperator[limit=1]
2023-12-14 13:40:22,165	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-14 13:40:22,166	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = Tru

                                                                                                                          

[{'VendorID': 1,
  'tpep_pickup_datetime': '0008',
  'tpep_dropoff_datetime': '0029',
  'passenger_count': 1,
  'trip_distance': 3.4,
  'RatecodeID': 1,
  'store_and_fwd_flag': 'N',
  'PULocationID': 140,
  'DOLocationID': 238,
  'payment_type': 1,
  'fare_amount': 21.9,
  'extra': 3.5,
  'mta_tax': 0.5,
  'tip_amount': 6.7,
  'tolls_amount': 0.0,
  'improvement_surcharge': 1.0,
  'total_amount': 33.6,
  'congestion_surcharge': 2.5,
  'Airport_fee': 0.0}]

与 `map()` 有所区别的是，`map_batches()` 是对一个批次进行处理，它模拟的是单机处理时，对整个数据集的操作。其设计思想主要为了方便将之前编写好的、单机的程序，无缝地迁移到 Ray 上：用户先编写一个单机的程序，然后使用 Ray Data 迁移到集群上。在 `map_batches()` 上，每个批次的数据格式为 `Dict[str, np.ndarray]` 或 `pd.DataFrame` 或 `pyarrow.Table` 表示，分别对应使用 NumPy 、pandas 和 Arrow 时，进行单机处理的业务逻辑。

下面的例子过滤某个字段的值，可以看到，经过过滤之后，数据的条数大大减少。

In [4]:
dataset = dataset.limit(200)
print(f"数据集中的行数：{dataset.count()}")
print()
lambda_filterd_dataset = dataset.map_batches(lambda df: df[df["passenger_count"] == 0],  batch_format="pandas")
print(f"过滤后的行数：{lambda_filterd_dataset.count()}")

2023-12-14 13:40:32,191	INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2023-12-14 13:40:32,193	INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 200, each read task output is split into 200 smaller blocks.
2023-12-14 13:40:32,196	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Map(format_datetime)] -> LimitOperator[limit=200]
2023-12-14 13:40:32,198	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-14 13:40:32,199	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = T

数据集中的行数：200



[36m(ReadParquet->SplitBlocks(200) pid=64926)[0m   return transform_pyarrow.concat(tables)                        
                                                                                                                          

过滤后的行数：4




### Task 与 Actor

可以看到，转换操作本质上是在执行 `fn` ，这个函数接收一个输入，进行转换，得到输出。默认情况下，Ray Data 使用 Task 并行执行转换操作。Ray Task 比较适合无状态的计算，即 `fn` 内不需要被不同数据反复依赖的数据。如果是有状态的计算，需要使用 Ray Actor。比如，加载一个机器学习模型，并用这个模型对所有数据进行预测。下面的例子模拟了机器学习模型预测的过程，模型本身是被反复使用的，所以是有状态的计算。

In [5]:
from typing import Dict
import numpy as np
import torch

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32)
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
)

[36m(ndarray_to_block pid=64926)[0m   super().__init__(pa.list_(dtype))
[36m(ndarray_to_block pid=64926)[0m In the future, Python-defined extension subclasses should derive from pyarrow.ExtensionType (not pyarrow.PyExtensionType) and implement their own serialization mechanism.
[36m(ndarray_to_block pid=64926)[0m 
[36m(ndarray_to_block pid=64926)[0m   for column_name in t.column_names:
[36m(ndarray_to_block pid=64926)[0m   for column_name in t.column_names:
[36m(ndarray_to_block pid=64926)[0m In the future, Python-defined extension subclasses should derive from pyarrow.ExtensionType (not pyarrow.PyExtensionType) and implement their own serialization mechanism.
[36m(ndarray_to_block pid=64926)[0m 
[36m(ndarray_to_block pid=64926)[0m   column = t[column_name]
[36m(ndarray_to_block pid=64926)[0m   column = t[column_name]
[36m(ndarray_to_block pid=64926)[0m In the future, Python-defined extension subclasses should derive from pyarrow.ExtensionType (not pyarrow.PyExtensi

RayTaskError(NotImplementedError): [36mray::ndarray_to_block()[39m (pid=64926, ip=127.0.0.1)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^
             ^^^^^^^^^^^^^
  File "/Users/luweizheng/anaconda3/envs/dispy/lib/python3.11/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 88, in dumps
    cp.dump(obj)
  File "/Users/luweizheng/anaconda3/envs/dispy/lib/python3.11/site-packages/ray/cloudpickle/cloudpickle_fast.py", line 733, in dump
    return Pickler.dump(self, obj)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "pyarrow/types.pxi", line 1710, in pyarrow.lib.PyExtensionType.__reduce__
NotImplementedError: Please implement UnknownExtensionType.__reduce__