(ray-data-transform)=
# 数据转换

数据处理的核心在于对数据进行一系列的转换（Transform），比如：

* 如何对一条、一批次进行转换
* 如何进行分组 `groupby`

## 转换

### map() 与 map_batches()

Ray Data 提供了两类数据转换操作，如 {numref}`map-map-batches` 所示：

* 每行数据，可以用 `Dataset.map()` 和 `Dataset.flat_map()` 这两个 API，即对每一条数据一一进行转换。这与其他大数据框架（Spark 或者 Flink）类似。输入一条数据，输出一条数据。
* 将多行数据打包为一个批次（Batch），对一个批次的数据进行转换：[`Dataset.map_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html)。输入一个 Batch，输出一个 Batch。

```{figure} ../img/ch-ray-data/map-map-batches.svg
---
width: 800px
name: map-map-batches
---
map() v.s. map_batches()
```

我们仍以纽约出租车数据为例，演示如何使用这两类转换操作。

In [1]:
import os
import shutil
import urllib.request
from typing import Any, Dict

import numpy as np
import pandas as pd
import torch
import ray

if ray.is_initialized:
    ray.shutdown()

ray.init()

folder_path = os.path.join(os.getcwd(), "../data/nyc-taxi")
download_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-06.parquet"
file_name = download_url.split("/")[-1]
parquet_file_path = os.path.join(folder_path, file_name)
if not os.path.exists(folder_path):
    # 创建文件夹
    os.makedirs(folder_path)
    print(f"文件夹 {folder_path} 不存在，已创建。")
    # 下载并保存 Parquet 文件
    with urllib.request.urlopen(download_url) as response, open(parquet_file_path, 'wb') as out_file:
        shutil.copyfileobj(response, out_file)
    print("数据已下载并保存为 Parquet 文件。")
else:
    print(f"文件夹 {folder_path} 已存在，无需操作。")

  from .autonotebook import tqdm as notebook_tqdm
2023-12-15 12:08:18,544	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2023-12-15 12:08:21,451	INFO worker.py:1673 -- Started a local Ray instance.


文件夹 /Users/luweizheng/Projects/py-101/distributed-python/ch-ray-data/../data/nyc-taxi 已存在，无需操作。


读取数据到 `Dataset` 类，先查看原有的数据格式，其中 `tpep_pickup_datetime` 和 `tpep_dropoff_datetime` 分别为乘客上车和下车时间，包含了日期和时间。

In [2]:
dataset = ray.data.read_parquet(parquet_file_path)
dataset.take(1)

2023-12-15 12:08:23,601	INFO util.py:159 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2023-12-15 12:08:26,697	INFO dataset.py:2383 -- Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.
2023-12-15 12:08:26,707	INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2023-12-15 12:08:26,708	INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 200, each read task output is split into 200 smaller blocks.
2023-12-15 12:08:26,712	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> LimitOperator[limit=1]
2023-12-15 12:08:26,713	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), lo

[{'VendorID': 1,
  'tpep_pickup_datetime': datetime.datetime(2023, 6, 1, 0, 8, 48),
  'tpep_dropoff_datetime': datetime.datetime(2023, 6, 1, 0, 29, 41),
  'passenger_count': 1,
  'trip_distance': 3.4,
  'RatecodeID': 1,
  'store_and_fwd_flag': 'N',
  'PULocationID': 140,
  'DOLocationID': 238,
  'payment_type': 1,
  'fare_amount': 21.9,
  'extra': 3.5,
  'mta_tax': 0.5,
  'tip_amount': 6.7,
  'tolls_amount': 0.0,
  'improvement_surcharge': 1.0,
  'total_amount': 33.6,
  'congestion_surcharge': 2.5,
  'Airport_fee': 0.0}]

:::{note}
Ray Data 的各类操作都是延迟（Lazy）执行的，即这些操作不是立即执行的，而是遇到数据查看或保存操作时，才会执行，比如：`show()`、`take()`、`iter_rows()`、`write_parquet()` 等操作会触发转换操作。
:::

使用 `map()` 对这两个字段进行格式化，丢弃日期，只保留24小时制的时间。`map()` 的最重要的参数是一个自定义的函数 `fn`，这个函数对每一条数据进行转换，返回一条。输入数据有 Schema，每条数据是一个 Key-Value 的字典，Key 是 Schema 中的字段名，Value 是对应的数值。

下面的例子，我们提取出了每次订单的时长、距离和价格，其他的字段先忽略。

In [3]:
def transform_row(row: Dict[str, Any]) -> Dict[str, Any]:
    result = {}
    result["trip_duration"] = (row["tpep_dropoff_datetime"] - row["tpep_pickup_datetime"]).total_seconds()
    result["trip_distance"] = row["trip_distance"]
    result["fare_amount"] = row["fare_amount"]
    return result

row_ds = dataset.map(transform_row)
row_ds.take(1)

2023-12-15 12:08:28,216	INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2023-12-15 12:08:28,219	INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 200, each read task output is split into 200 smaller blocks.
2023-12-15 12:08:28,221	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[Map(transform_row)] -> LimitOperator[limit=1]
2023-12-15 12:08:28,222	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-15 12:08:28,224	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`

[{'trip_duration': 1253.0, 'trip_distance': 3.4, 'fare_amount': 21.9}]

与 `map()` 有所区别的是，`map_batches()` 是对一个批次进行处理，它模拟的是单机处理时，对整个数据集的操作。其设计思想主要为了方便将之前编写好的、单机的程序，无缝地迁移到 Ray 上：用户先编写一个单机的程序，然后使用 Ray Data 迁移到集群上。`map_batches()` 每个批次的数据格式为 `Dict[str, np.ndarray]` 或 `pd.DataFrame` 或 `pyarrow.Table`，分别对应 NumPy 、pandas 和 Arrow。

下面的例子与 `map()` 实现的功能类似，只不过通过 pandas 的形式，对每个 Batch 进行操作。

In [4]:
def transform_df(input_df: pd.DataFrame) -> pd.DataFrame:
    result_df = pd.DataFrame()
    result_df["trip_duration"] = (input_df["tpep_dropoff_datetime"] - input_df["tpep_pickup_datetime"]).dt.seconds
    result_df["trip_distance"] = input_df["trip_distance"]
    result_df["fare_amount"] = input_df["fare_amount"]
    return result_df

batch_ds = dataset.map_batches(transform_df, batch_format="pandas")
batch_ds.take(10)

2023-12-15 12:08:35,838	INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2023-12-15 12:08:35,839	INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 200, each read task output is split into 200 smaller blocks.
2023-12-15 12:08:35,840	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(transform_df)] -> LimitOperator[limit=10]
2023-12-15 12:08:35,842	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-15 12:08:35,843	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress 

[{'trip_duration': 1253, 'trip_distance': 3.4, 'fare_amount': 21.9},
 {'trip_duration': 614, 'trip_distance': 3.4, 'fare_amount': 15.6},
 {'trip_duration': 1123, 'trip_distance': 10.2, 'fare_amount': 40.8},
 {'trip_duration': 1406, 'trip_distance': 9.83, 'fare_amount': 39.4},
 {'trip_duration': 514, 'trip_distance': 1.17, 'fare_amount': 9.3},
 {'trip_duration': 796, 'trip_distance': 3.6, 'fare_amount': 18.4},
 {'trip_duration': 1136, 'trip_distance': 3.08, 'fare_amount': 19.8},
 {'trip_duration': 527, 'trip_distance': 1.1, 'fare_amount': 10.0},
 {'trip_duration': 237, 'trip_distance': 0.99, 'fare_amount': 6.5},
 {'trip_duration': 171, 'trip_distance': 0.73, 'fare_amount': 5.1}]

在实现 `map()` 或者 `map_batch()` 时，也可以使用 Python 的 lambda 表达式，即一个匿名的 Python 函数。比如：

In [5]:
filterd_dataset = dataset.map_batches(lambda df: df[df["trip_distance"] > 4], batch_format="pandas")
print(f"过滤后的行数：{filterd_dataset.count()}")

2023-12-15 12:08:37,202	INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2023-12-15 12:08:37,203	INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 200, each read task output is split into 200 smaller blocks.
2023-12-15 12:08:37,205	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(<lambda>)]
2023-12-15 12:08:37,207	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-15 12:08:37,208	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`
[36m(ReadParquet->Spli

过滤后的行数：730352


### Task 与 Actor

可以看到，转换操作本质上是在执行 `fn` ，这个函数接收一个输入，进行转换，得到输出。默认情况下，Ray Data 使用 Task 并行执行转换操作。Ray Task 比较适合无状态的计算，即 `fn` 内不需要被不同数据反复依赖的数据。如果是有状态的计算，需要使用 Ray Actor。比如，加载一个机器学习模型，并用这个模型对所有数据进行预测。下面的例子模拟了机器学习模型预测的过程，模型本身是被反复使用的，所以是有状态的计算。这个例子仅仅作为演示，所使用的并非是训练好的模型，而是一个等价变换 `torch.nn.Identity()`，它将输入原封不动地转换为输出。

In [6]:
class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity()
        self.model.eval()

    def __call__(self, df: pd.DataFrame) -> Dict[str, np.ndarray]:
        pred = {}
        inputs = torch.as_tensor(df['trip_distance'], dtype=torch.float32)
        with torch.inference_mode():
            pred["output"] = self.model(inputs).detach().numpy()
        return pred

pred_ds = batch_ds.limit(100).map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
pred_ds.take(3)

2023-12-15 12:08:44,228	INFO split_read_output_blocks.py:101 -- Using autodetected parallelism=200 for stage ReadParquet to satisfy DataContext.get_current().min_parallelism=200.
2023-12-15 12:08:44,228	INFO split_read_output_blocks.py:106 -- To satisfy the requested parallelism of 200, each read task output is split into 200 smaller blocks.
2023-12-15 12:08:44,229	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> TaskPoolMapOperator[MapBatches(transform_df)] -> LimitOperator[limit=100] -> ActorPoolMapOperator[MapBatches(TorchPredictor)] -> LimitOperator[limit=3]
2023-12-15 12:08:44,230	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-15 12:08:44,230	INFO streaming_executor.py:107 -- Tip: For detailed progress reporti

[{'output': 3.4000000953674316},
 {'output': 3.4000000953674316},
 {'output': 10.199999809265137}]

使用 Actor 大概分为 3 步骤：

1. 创建一个类，这个类包含一个 `__init__()` 方法和一个 `__call__()` 方法。`__init__()` 方法初始化一些可被反复使用的状态数据，`__call__()` 方法实现转换操作。可以参考刚才实现的 `TorchPredictor` 类。
2. 创建一个 `ActorPoolStrategy`，指定一共多少个 Worker。
3. 调用 `map_batch()` 方法，将 `ActorPoolStrategy` 传递给 `compute` 参数。

## 分组

数据处理中另外一个经常使用的原语是分组聚合，Ray Data 提供了： [groupby()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.groupby.html)。Ray Data 先调用 `groupby()`，对数据按照某些字段进行分组，再调用 `map_groups()` 对分组之后的数据进行聚合。

`groupby(key)` 的参数 `key` 是需要进行分组的字段；`map_groups(fn)` 的参数 `fn`，对同一个组的数据进行操作。Ray Data 预置了一些聚合函数，比如常见的求和 `sum()`，最大值 `max()`，平均值 `mean()` 等。比如下面的例子使用 `mean()` 对 `value` 字段进行聚合。 

In [7]:
ds = ray.data.from_items([
    {"group": 1, "value": 1},
    {"group": 1, "value": 2},
    {"group": 2, "value": 3},
    {"group": 2, "value": 4}])
mean_ds = ds.groupby("group").mean("value")
mean_ds.show()

2023-12-15 12:08:49,415	INFO streaming_executor.py:104 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate] -> LimitOperator[limit=20]
2023-12-15 12:08:49,415	INFO streaming_executor.py:105 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2023-12-15 12:08:49,417	INFO streaming_executor.py:107 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`

[A
[A

[A[A

Sort Sample 0:   0%|          | 0/4 [00:00<?, ?it/s] object_store_memory:   0%|          | 0/4 [00:00<?, ?it/s]

  return transform_pyarrow.concat(tables)
                                                                                                               
[A

[A[A

{'group': 1, 'mean(value)': 1.5}
{'group': 2, 'mean(value)': 3.5}
