# 02 - Daft + Ray 分布式数据处理

## 学习目标

- 理解 Daft 的 Native Runner 和 Ray Runner 的区别
- 掌握 Daft Ray Runner 的配置方法
- 在 Ray 上执行 Daft 数据处理操作
- 对比 Native Runner 和 Ray Runner 的性能
- 学会使用 Ray Dashboard 监控 Daft 任务

## 1. Daft 执行后端

Daft 支持两种执行后端：

| 特性 | Native Runner | Ray Runner |
|------|--------------|------------|
| 执行方式 | 本地多线程 | 分布式多进程 |
| 适用场景 | 单机小数据 | 大数据/多机集群 |
| 启动开销 | 无 | 需要启动 Ray |
| 扩展性 | 受限于单机 | 可扩展到集群 |
| 配置 | 默认 | `daft.set_runner_ray()` |

```
+------------------+     +------------------+
|  Native Runner   |     |   Ray Runner     |
|                  |     |                  |
| +------+------+  |     | +----+ +----+    |
| |Thread|Thread|  |     | |Proc| |Proc|    |
| +------+------+  |     | +----+ +----+    |
| |Thread|Thread|  |     | |Proc| |Proc|    |
| +------+------+  |     | +----+ +----+    |
|  Single Process  |     |  Ray Workers     |
+------------------+     +------------------+
```

> **注意**：Daft 的 Runner 在一个 Python 进程中只能设置一次，不能在运行时切换。本 Notebook 全程使用 Ray Runner。

## 2. 配置 Ray Runner

In [None]:
import ray
import daft
from daft import col
import time

print(f"Daft 版本: {daft.__version__}")
print(f"Ray 版本: {ray.__version__}")

In [None]:
# 初始化 Ray（本地模式）
ray.init()

# 设置 Daft 使用 Ray Runner
daft.set_runner_ray()

print("Daft Ray Runner 已配置")
print(f"Ray Dashboard: http://127.0.0.1:8265")

In [None]:
# 验证 Ray Runner 工作正常
test_df = daft.from_pydict({"x": [1, 2, 3], "y": [4, 5, 6]})
result = test_df.select(col("x") + col("y")).collect()
print("验证结果:", result.to_pydict())

## 3. 读取 Demo 1 数据

复用 Demo 1 的产品数据集，无需重新生成。

In [None]:
# 读取 Demo 1 的产品数据
df = daft.read_parquet("../../demo1_daft/data/products.parquet")

# 查看 Schema
print("Schema:")
print(df.schema())

In [None]:
# 查看数据概览
print(f"列名: {df.column_names}")
df.show(5)

## 4. Ray Runner 下的数据处理

以下操作与 Demo 1 中的操作类似，但现在是在 Ray 上分布式执行。

### 4.1 过滤和选择

In [None]:
# 筛选高价电子产品
expensive_electronics = (
    df
    .where(col("category") == "电子产品")
    .where(col("price") > 2000)
    .select("product_id", "name", "price", "rating")
    .sort("price", desc=True)
    .limit(10)
)

expensive_electronics.show()

### 4.2 聚合分析

In [None]:
# 按类别统计
category_stats = (
    df
    .groupby("category")
    .agg(
        col("price").mean().alias("avg_price"),
        col("price").max().alias("max_price"),
        col("rating").mean().alias("avg_rating"),
        col("product_id").count().alias("count"),
    )
    .sort("count", desc=True)
)

category_stats.show()

### 4.3 UDF 在 Ray 上执行

In [None]:
# 定义 UDF：价格等级分类
@daft.udf(return_dtype=daft.DataType.string())
def price_tier(price):
    """根据价格划分等级"""
    result = []
    for p in price.to_pylist():
        if p is None:
            result.append(None)
        elif p < 100:
            result.append("低价")
        elif p < 1000:
            result.append("中价")
        elif p < 3000:
            result.append("高价")
        else:
            result.append("奢侈")
    return result

# 在 Ray 上执行 UDF
df_with_tier = df.with_column(
    "price_tier",
    price_tier(col("price")),
)

# 按价格等级统计
tier_stats = (
    df_with_tier
    .groupby("price_tier")
    .agg(
        col("product_id").count().alias("count"),
        col("price").mean().alias("avg_price"),
    )
    .sort("avg_price")
)

tier_stats.show()

## 5. 性能对比

生成较大的数据集，对比 Native Runner 和 Ray Runner 的性能差异。

> **说明**：由于 Daft 不支持运行时切换 Runner，我们通过 `subprocess` 在独立进程中运行 Native Runner 测试。

In [None]:
import subprocess
import os

# 生成 100 万条测试数据
data_script = os.path.abspath("../../demo1_daft/data/generate_data.py")
output_dir = "/tmp/demo2_perf_test"
os.makedirs(output_dir, exist_ok=True)

print("正在生成 100 万条测试数据...")
result = subprocess.run(
    ["python", data_script, "--size", "1000000", "--output", output_dir],
    capture_output=True, text=True
)
print(result.stdout[-200:] if len(result.stdout) > 200 else result.stdout)

In [None]:
# Native Runner 性能测试（在子进程中运行）
native_script = '''
import daft
from daft import col
import time

df = daft.read_parquet("/tmp/demo2_perf_test/products.parquet")

start = time.time()
result = (
    df
    .where(col("price") > 100)
    .groupby("category")
    .agg(
        col("price").mean().alias("avg_price"),
        col("price").max().alias("max_price"),
        col("product_id").count().alias("count"),
    )
    .sort("count", desc=True)
    .collect()
)
elapsed = time.time() - start
print(f"{elapsed:.4f}")
'''

native_times = []
for i in range(3):
    result = subprocess.run(
        ["python", "-c", native_script],
        capture_output=True, text=True
    )
    t = float(result.stdout.strip())
    native_times.append(t)
    print(f"Native Runner 第 {i+1} 次: {t:.4f}s")

native_avg = sum(native_times) / len(native_times)
print(f"Native Runner 平均: {native_avg:.4f}s")

In [None]:
# Ray Runner 性能测试（当前进程）
large_df = daft.read_parquet("/tmp/demo2_perf_test/products.parquet")

ray_times = []
for i in range(3):
    start = time.time()
    result = (
        large_df
        .where(col("price") > 100)
        .groupby("category")
        .agg(
            col("price").mean().alias("avg_price"),
            col("price").max().alias("max_price"),
            col("product_id").count().alias("count"),
        )
        .sort("count", desc=True)
        .collect()
    )
    elapsed = time.time() - start
    ray_times.append(elapsed)
    print(f"Ray Runner 第 {i+1} 次: {elapsed:.4f}s")

ray_avg = sum(ray_times) / len(ray_times)
print(f"Ray Runner 平均: {ray_avg:.4f}s")

In [None]:
# 性能对比总结
print("=" * 50)
print("性能对比（100 万条数据，过滤+聚合+排序）")
print("=" * 50)
print(f"Native Runner 平均: {native_avg:.4f}s")
print(f"Ray Runner 平均:    {ray_avg:.4f}s")

if ray_avg < native_avg:
    print(f"Ray Runner 快 {native_avg / ray_avg:.2f}x")
else:
    print(f"Native Runner 快 {ray_avg / native_avg:.2f}x")
    print("\n提示: 在单机小数据场景下，Native Runner 可能更快，")
    print("因为 Ray 有进程间通信开销。Ray 的优势在大数据和多机场景中体现。")

## 6. 分布式处理模式

Ray Runner 下的一些常见分布式处理模式。

### 6.1 批量 UDF 处理

In [None]:
# 批量 UDF：对描述文本进行处理
@daft.udf(return_dtype=daft.DataType.int64())
def text_length(description):
    """计算文本长度"""
    return [len(d) if d is not None else 0 for d in description.to_pylist()]

@daft.udf(return_dtype=daft.DataType.bool())
def is_high_value(price, rating):
    """判断是否为高价值商品（高评分 + 高价格）"""
    prices = price.to_pylist()
    ratings = rating.to_pylist()
    return [
        (p is not None and r is not None and p > 500 and r > 4.5)
        for p, r in zip(prices, ratings)
    ]

# 应用多个 UDF
enriched_df = (
    df
    .with_column("desc_length", text_length(col("description")))
    .with_column("high_value", is_high_value(col("price"), col("rating")))
)

# 查看高价值商品
high_value_products = (
    enriched_df
    .where(col("high_value") == True)
    .select("product_id", "name", "price", "rating", "desc_length")
    .sort("price", desc=True)
    .limit(10)
)

high_value_products.show()

### 6.2 分区并行处理

In [None]:
# 使用 repartition 控制并行度
# 更多分区 = 更高并行度（但也有更多调度开销）
large_df = daft.read_parquet("/tmp/demo2_perf_test/products.parquet")

# 重新分区为 8 个分区
repartitioned = large_df.repartition(8)

start = time.time()
result = (
    repartitioned
    .where(col("price") > 100)
    .groupby("category", "subcategory")
    .agg(
        col("price").mean().alias("avg_price"),
        col("product_id").count().alias("count"),
    )
    .sort("count", desc=True)
    .collect()
)
elapsed = time.time() - start

print(f"8 分区处理耗时: {elapsed:.4f}s")
print(f"结果行数: {len(result)}")
result.show(10)

## 7. 监控和调试

### Ray Dashboard

Ray 提供了 Web Dashboard 用于监控集群状态和任务执行：

- **地址**：http://127.0.0.1:8265
- **Jobs 页面**：查看所有提交的任务
- **Cluster 页面**：查看节点资源使用情况
- **Actors 页面**：查看活跃的 Actor

### 常用调试方法

| 方法 | 说明 |
|------|------|
| `ray.cluster_resources()` | 查看集群总资源 |
| `ray.available_resources()` | 查看可用资源 |
| `ray.nodes()` | 查看集群节点信息 |
| Dashboard Jobs 页面 | 查看任务执行详情 |
| Dashboard Logs 页面 | 查看 Worker 日志 |

In [None]:
# 查看集群节点信息
nodes = ray.nodes()
for node in nodes:
    print(f"节点: {node['NodeManagerAddress']}")
    print(f"  状态: {'活跃' if node['Alive'] else '离线'}")
    res = node.get('Resources', {})
    print(f"  CPU: {res.get('CPU', 'N/A')}")
    print(f"  内存: {res.get('memory', 0) / 1024 / 1024 / 1024:.2f} GB")

## 8. 清理

In [None]:
# 清理临时测试数据
import shutil

if os.path.exists("/tmp/demo2_perf_test"):
    shutil.rmtree("/tmp/demo2_perf_test")
    print("已清理临时测试数据")

# 关闭 Ray
ray.shutdown()
print("Ray 已关闭")

## 总结

| 概念 | 说明 | 关键 API |
|------|------|----------|
| 配置 Ray Runner | 设置 Daft 使用 Ray 执行 | `daft.set_runner_ray()` |
| 数据读取 | 与 Native Runner 完全相同 | `daft.read_parquet()` |
| 分布式处理 | 过滤/聚合/UDF 自动分布式执行 | 无需额外代码 |
| 分区控制 | 调整并行度 | `df.repartition(n)` |
| 性能特点 | 大数据集优势明显，小数据有开销 | 根据场景选择 Runner |
| 监控 | Ray Dashboard | http://127.0.0.1:8265 |

### 关键收获

1. **零代码改动**：从 Native Runner 切换到 Ray Runner，数据处理代码完全不变
2. **自动分布式**：Daft 自动将查询计划拆分为 Ray Tasks 执行
3. **场景选择**：小数据用 Native Runner，大数据/多机用 Ray Runner

## 练习

1. **分区实验**：尝试不同的分区数（2、4、8、16），观察对性能的影响。
2. **复杂 UDF**：编写一个 UDF 对产品名称进行分词，统计最常见的词。
3. **多表 Join**：在 Ray Runner 下读取 CSV 和 Parquet 两种格式的数据，进行 Join 操作。

## 下一步

继续学习 [03_kubernetes_deployment.ipynb](./03_kubernetes_deployment.ipynb) —— 了解如何在 Kubernetes 上部署 Ray 集群（进阶/可选）。