# 01 数据整合与清洗

本 Notebook 是智能商品搜索系统的第一步：整合并清洗来自不同来源的数据。

**本节无需 API Key**，纯数据处理操作。

## 系统架构

我们要构建一个智能搜索系统，支持用自然语言搜索商品和评论。整体 pipeline 如下：

```
+-------------------+     +-------------------+
| Product Catalog   |     | Customer Reviews  |
| (Demo1 Parquet)   |     | (Demo3 CSV)       |
+---------+---------+     +---------+---------+
          |                         |
          v                         v
+---------+---------+     +---------+---------+
| Daft: Clean       |     | Daft: Clean       |
| - Deduplicate     |     | - Filter short    |
| - Handle nulls    |     | - Add metadata    |
| - Build text      |     |                   |
+---------+---------+     +---------+---------+
          |                         |
          v                         v
+---------+---------+     +---------+---------+
| embed_text        |     | embed_text        |
| (Notebook 02)     |     | (Notebook 02)     |
+---------+---------+     +---------+---------+
          |                         |
          +----------+--------------+
                     |
                     v
          +----------+----------+
          | LanceDB             |
          | - products table    |
          | - reviews table     |
          +----------+----------+
                     |
                     v
          +----------+----------+
          | Search & Recommend  |
          | (Notebook 03)       |
          +---------------------+
```

两份数据作为独立的搜索维度：产品搜索基于产品描述，评论搜索基于评论文本。

## 1. 读取数据

复用 Demo 1 的产品数据（Parquet 格式）和 Demo 3 的评论数据（CSV 格式）。

In [None]:
import daft
from daft import col

# 读取产品数据（Demo 1）
df_products = daft.read_parquet("../../demo1_daft/data/products.parquet")
print("产品数据 Schema:")
print(df_products.schema())
df_products.limit(3).show()

In [None]:
# 读取评论数据（Demo 3）
df_reviews = daft.read_csv("../../demo3_lancedb/data/reviews.csv")
print("评论数据 Schema:")
print(df_reviews.schema())
df_reviews.limit(3).show()

## 2. 产品数据清洗

Demo 1 的数据生成脚本有意引入了数据质量问题（~2% 重复、~5% description 缺失、~3% brand 缺失），正好用来演示清洗流程。

### 2.1 数据质量概览

先看看原始数据有哪些问题。

In [None]:
# 总行数
total = df_products.count_rows()
print(f"总行数: {total}")

# 重复记录数
distinct = df_products.select("product_id").distinct().count_rows()
print(f"唯一 product_id: {distinct}")
print(f"重复记录: {total - distinct}")

In [None]:
# 缺失值统计
df_products.agg(
    col("description").is_null().cast(daft.DataType.int64()).sum().alias("description_null"),
    col("brand").is_null().cast(daft.DataType.int64()).sum().alias("brand_null"),
    col("rating").is_null().cast(daft.DataType.int64()).sum().alias("rating_null"),
).show()

### 2.2 去重

按 `product_id` 去重，保留第一条记录。

In [None]:
df_products_dedup = df_products.distinct("product_id")
print(f"去重前: {total} 条")
print(f"去重后: {df_products_dedup.count_rows()} 条")

### 2.3 处理缺失值

- `description` 为空的记录无法生成有意义的嵌入，直接过滤
- `brand` 为空的填充为 "未知品牌"
- `rating` 为空的填充为 0（标记为无评分）

In [None]:
df_products_clean = (
    df_products_dedup
    # 过滤 description 为空的记录
    .where(col("description").not_null())
    # 填充 brand 缺失值
    .with_column("brand", col("brand").fill_null("未知品牌"))
    # 填充 rating 缺失值
    .with_column("rating", col("rating").fill_null(0.0))
)

print(f"清洗后: {df_products_clean.count_rows()} 条")

### 2.4 过滤无效记录

过滤价格异常的记录。

In [None]:
df_products_clean = df_products_clean.where(col("price") > 0)
print(f"过滤无效价格后: {df_products_clean.count_rows()} 条")

### 2.5 构建搜索文本

将品牌、子类别、描述拼接为一个 `search_text` 列，作为后续嵌入生成的输入。

这样搜索 "华为手机" 时，品牌和类别信息也会参与语义匹配。

In [None]:
from daft import lit

df_products_final = df_products_clean.with_column(
    "search_text",
    col("brand") + lit(" ") + col("subcategory") + lit(" ") + col("description"),
)

# 查看搜索文本效果
df_products_final.select("product_id", "name", "search_text").limit(5).show()

## 3. 评论数据清洗

Demo 3 的评论数据已经过初步清洗（prepare_data.py），这里做进一步处理。

### 3.1 数据概览

In [None]:
print(f"评论总数: {df_reviews.count_rows()}")

# 类别分布
print("\n类别分布:")
df_reviews.groupby("cat").agg(col("review").count().alias("count")).sort("count", desc=True).show()

# 标签分布
print("标签分布 (1=正面, 0=负面):")
df_reviews.groupby("label").agg(col("review").count().alias("count")).show()

### 3.2 过滤与增强

过滤过短的评论（信息量不足），并添加评论长度列。

In [None]:
df_reviews_clean = (
    df_reviews
    .where(col("review").not_null())
    .where(col("review").length() > 5)
    .with_column("review_length", col("review").length())
)

print(f"清洗前: {df_reviews.count_rows()} 条")
print(f"清洗后: {df_reviews_clean.count_rows()} 条")

In [None]:
# 评论长度统计
df_reviews_clean.agg(
    col("review_length").min().alias("min_len"),
    col("review_length").mean().alias("avg_len"),
    col("review_length").max().alias("max_len"),
).show()

## 4. 数据概览对比

清洗完成，看看两份数据的最终状态。

In [None]:
products_count = df_products_final.count_rows()
reviews_count = df_reviews_clean.count_rows()

print("=== 数据概览 ===")
print(f"产品数据: {products_count} 条")
print(f"  列: product_id, name, category, subcategory, price, rating, brand, description, search_text, ...")
print(f"\n评论数据: {reviews_count} 条")
print(f"  列: cat, label, review, review_length")
print(f"\n合计: {products_count + reviews_count} 条文本需要生成嵌入")

In [None]:
# 产品类别分布
print("产品类别分布:")
df_products_final.groupby("category").agg(
    col("product_id").count().alias("count"),
    col("price").mean().alias("avg_price"),
    col("rating").mean().alias("avg_rating"),
).sort("count", desc=True).show()

## 5. 保存清洗结果

将清洗后的数据保存为 Parquet 格式，供 Notebook 02 使用。

In [None]:
# 保存产品数据
df_products_final.write_parquet("../data/products_clean.parquet")
print("已保存: data/products_clean.parquet")

# 保存评论数据
df_reviews_clean.write_parquet("../data/reviews_clean.parquet")
print("已保存: data/reviews_clean.parquet")

## 小结

本 Notebook 完成了数据整合与清洗：

- **产品数据**（Demo 1）：去重、处理缺失值、过滤无效记录、构建搜索文本
- **评论数据**（Demo 3）：过滤短评论、添加长度元数据

这些操作全部使用 Daft DataFrame API 完成（回顾 Demo 1 的知识）。

下一步，Notebook 02 将用 Daft 的 `embed_text` 为这些文本生成嵌入向量，并写入 LanceDB。