# RDS to Datalake


## Overview


### About This Project

**项目背景**

企业的业务数据库由于对一致性, 原子性有着较高的要求, 通常都跑在 Relational Database 上. 随着商业模式的进化, 用数据帮助企业做决策对企业的发展起到了决定性的作用. 所以这些业务数据进行分析就成为了刚需. 数据分析的查询往往涉及到扫描大量数据, 直接在业务数据库上进行分析往往会严重影响业务数据库的性能. 这是因为业务数据库的设计本身就必须优先服务于一致性, 原子性, 天生就不适合大数据分析. 所以企业往往会采用将业务数据库在低谷期, 例如周末的半夜, 将 point-in-time 数据导出到数据仓库进行分析. 但是导出这个动作同样会严重影响数据库性能, 所以这个操作的频率不能太高. 这就在工作日, 数据仓库中的数据都是上周的. 对于有些瞬息万变的业务, 例如金融业务, 互联网业务, 这是不能接受的.

本项目是基于作者的职业生涯过程中, 做过的 10 多个将关系数据库 (Oracle, MSSQL, MySQL, Postgres) 中的大数据 (1 billion 到 1 trillion 行之间) 以近实时的方式 (数据延迟在 2 - 5 分钟) 同步到 Data Lake 中的项目, 总结出的一套通用的解决方案. 并为这套解决方案开发了一套软件, 既可以用于帮助客户快速学习这套方案, 也可以自用为具体客户解决具体问题, 基于该软件进行二次开发.

**关于本项目**

这篇文档介绍了如何使用该项目, 练习使用 AWS DMS 将 RDS 中的数据近实时地同步到 S3 DataLake 中. 这个项目的代码库包含了一系列工具, 和自动化脚本, 能够方便地复现每一步关键步骤, 包括:

- 创建所需的基础设施资源, 包括用于存放数据的 S3 Bucket; RDS Database Instance 所需的 Subnet Group, Security Group, Parameter Group; 必要的 IAM Roles; 用于数据同步的 DMS Subnet Group, DMS Replication Instance, DMS Postgres Source Endpoint, DMS S3 Target Endpoint 和 DMS Migration Task; 用于储存数据的 Metadata 的 Glue Catalog Database; 用于执行 ETL 的 Glue Job; 用于自动化调度 ETL Job 以处理不断摄入的增量数据的 Lambda Function.
- 一个用于模拟应用程序不断地对数据库进行写入, 更新操作的脚本. 可以模拟 10TPS (每秒钟 10 个 Transaction) 的业务流量. 我们也可以用异步框架以及多线程技术在单机 Macbook 笔记本上模拟出 3000 TPS+ 的业务流量.
- 一个用于扫描 Database 中的数据以及 DataLake 中的数据, 并进行对比的脚本, 这样可以证明 Database 和 DataLake 中的数据是完全一致的.
- 许多用于测试的脚本, 可以测试整个系统中各个单元的工作情况, 帮助开发者更好的学习. 例如可以测试 Database connection 的脚本, 预览 DMS Output 数据的脚本, 可以对 DataLake 中的数据进行查询的脚本等.

**前置知识**

你需要有一定的 Python 开发经验, 并且能正确安装 [AWS CDK](https://aws.amazon.com/cdk/) 部署工具, 对关系数据库有基本的了解, 有基本的 AWS 开发经验.

### Setup Your Development Environemnt

- MacOS / Linux 电脑, 可以是有界面的远程桌面, 也可以是纯命令行界面的服务器. 推荐使用 AWS Cloud 9.
- NodeJS 18, 我们的 CDK 版本是基于 NodeJS 18 的
- Python 3.10, 我们所用的 ETL Glue Job 4.0 是基于 Python 3.10 的, 有助于

### Install AWS CDK

关于 Node 的安装最好参考 [Node 官方文档](https://nodejs.org/en/download/package-manager/). 以下是我在 MacOS 上安装 Node 的经验.

1. 安装 Node.

```
# 安装最新的 node
brew install node
# 安装特定版本的 node, 推荐安装 18
# 我现在的时间是 2023 年 5 月, 这是一个 LTS, long term support 版本, 支持到 2025 年 4 月,
# 并且 CDK 在这个版本上充分测试过了
brew install node@18

# 如果安装了特定版本的 node, 你需要手动将其加到 PATH 中从而让你的 shell 能找到正确的 node
export PATH="/opt/homebrew/opt/node@18/bin:$PATH"
export LDFLAGS="-L/opt/homebrew/opt/node@18/lib"
export CPPFLAGS="-I/opt/homebrew/opt/node@18/include"

# 该命令可以更新 node
brew upgrade node

# Linux 上的安装取决于你的系统

# 这里要注意, 你如果需要用特定版本的 node, brew 是可以允许你同时安装多个版本的 node,
# 并且将全局的 node 命令绑定到特定版本
# 如果你不小心安装错了 node, 比如直接用 brew install node 安装了
# 那么你重新安装特定版本的 node 之后, 需要重新绑定 simlink, 使得全局的 node 指向
# 的是你需要的那个版本, 下面的命令可以做到这一点
brew link --overwrite node@18
```

2. 安装 Node 下的 CDK. 其他编程语言只是实现了一层壳, 还是需要调用 Node CDK 的 API. 最好参考 [AWS CDK 官网文档](https://docs.aws.amazon.com/cdk/latest/guide/getting_started.html).

```
# 检查 node 版本
node

# 将 cdk cli 安装到 node 全局
npm install -g aws-cdk

# 检查 cdk 版本
cdk --version
```

3. BootStrap 引导程序. 在你第一次在某个 AWS Account 和 Region 中使用 CDK 的时候, 你需要做 Bootstrap. 这个 Bootstrap 的行为是为了在 AWS 中创建一些必要的资源来供 CDK 这个工具本身所使用.


```
# 显式指定 AWS CLI Profile 所对应的 Account 和 Region
cdk bootstrap --profile ${your_aws_cli_profile}
```

4. 在 Python 中安装 aws-cdk-lib

AWS CDK for Python 是以 PyPI 上的第三方包的形式存在的. 在 CDK 1.X 的时候, 你需要安装的 aws-cdk.core. 然后各个服务有相应的子模块, 例如 S3 的是 aws-cdk.aws-s3. 这对于开发者维护每个依赖的版本非常不方便. 从 CDK 2.X 开始, 你可以只安装一个 aws-cdk-lib 就可以了. 而对于还不是 stable 的实验性功能, 你可以通过安装 aws-cdk/aws-lambda-alpha 来使用. 但不推荐在生产环境中使用它们:

```
pip install aws-cdk-lib
```

Reference:

- [CDK Python 入门文档](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html)
- [CDK Python Reference](https://docs.aws.amazon.com/cdk/api/v2/python/modules.html)


## Solution Walkthrough

### How it Works

将关系数据库中的数据以近实时的方式同步到数据湖的过程中涉及到以下核心技术.

**Database Table Schema Design**

关系数据库中的表格中的每一行数据在逻辑上都必然有一个 create_at (创建时间) 和 update_at (更新时间). 无论表中是否有这两个字段, 这两个值在逻辑上都是客观存在的. 当一行被第一次创建的时候, create_at 和 update_at 的值是一样的. 对于业务逻辑而言, 这两个字段会占用额外存储空间, 且通常不是必须的. 但是从合规以及审计, 以及对于数据分析而言, 这两个字段的帮助非常大. 举例来说, 通常业务中的数据量是随着时间不断增加的, 那么我们就可以用 create_at 作为 partition key, 在 DataLake 中将数据打散到不同的 partition. 这样能保证每个 partition 中的数据量是可预估并比较有限的, 并且可以利用 push down predicate 技术减少查询时要扫描的数据量, 以提高查询效率. 另一个例子是 update_at 字段可以用来过滤出在某个时间之后发生过更新的数据, 这样我们就可以轻易的找到增量数据并仅对 DataLake 进行增量更新. 这样做减少了工作量 (传统的数据库导出到数据仓库通常是用全量导出), 并提高了数据同步的速度.

所以我推荐在设计数据模型的时候, 就为每个表都创建这两个字段.

**AWS DMS**

AWS DMS 是一个 2016 年上线, 用于 Database Migration 的服务. 支持非常多种类的数据源以及目标数据存储. 涵盖了关系数据库, NoSQL 数据库, Key Value 数据库, 数据仓库, 对象存储等 20 多个系统之间的数据互通, 是一个非常强大且有用的工具. 它还有 Schema Convertion 工具能将 data model 在 migration 的过程中进行调整, 以及很多其他功能. 在我们这个项目中我们不展开讨论, 我们仅仅用它做 as it is (不做任何数据改变, 原来数据什么样, 最后数据就怎么样) 的 Data Sync.

这个服务的本质是将 Database Migration 抽象为:

- Replication Instance: 是一台对用户不可见的 EC2 虚拟机, 上面安装了 DMS 的软件. 它会真正运行你的 Migration Task, 从 Data Source 中不断地读数据, 并且 Capture Data Change Event (incremental data), 将数据写入到 Data Target 中. 从 2023-07 起, 还提供了 DMS Serverless 选项, 能自动以集群模式运行, 保证高可用. 但这里我们不展开说.
- Source Endpoint: 用于储存 data source 的连接 metadata 等信息.
- Target Endpoint: 用于储存 data target 的连接 metadata 等信息.
- Migration Task: 一个具体的 Migration Task, 本质上是 Replication Instance + Source Endpoint + Target Endpoint 的组合. 这个 Task 会被提交到 Replication Instance 上来执行. Migration Task 有三种模式: 只 Migrate Initial Load, 只 Migrate Incremental Data, 先 Migrate Initial Load, 再 Migrate Incremental Data, 永不停止直到用户手动停止. 在此项目中我们的目标是进行近实时的数据同步, 所以会选择第三种模式.

**Intial Load and Incremental Data**

- **Initial Load**: 首先, 我们需要将某个时间节点的所有数据导出到 S3. 如果你的数据库中已经有很多数据了, 那么这个过程会比较长. 但是不用担心, 这个操作我们只需要做一次, 在这之后我们就会专注于处理有限的增量数据. AWS DMS 服务有这个功能, 可以帮我们做到这个操作.
- **Increment Data**: 凡是支持 Transaction 的 Relational Database 为了保证不丢失数据, 通常都有 Write Ahead Log (WAL) 的功能 (这个功能在不同的数据库中有不同的名字, 但是原理是一模一样的). 它的目的是将接收到任何数据变更请求 (insert, update, delete) 时, 就立刻将这个操作落盘持久化到 (WAL) 中, 留下一个记录, 然后再进行实际的数据变更操作. 这样无论任何时候因为任何原因 (网络, 断电) 导致操作失败, 数据库都能根据 WAL 进行恢复. 而我们可以将 WAL 导出到一个 Data Stream 中然后将所有的变更操作严格按照顺序 dump 到 S3 中. AWS DMS 服务有这个功能, 能帮我们自动对数据库进行必要的配置, 并设置好 Data Stream, 以及处理数据的导出.

所以, 将于数据库中的数据导出的操作将由 AWS DMS 负责. 接下来我们来讲解如何对导出的数据进行处理, 并 Load 到 DataLake 中.

**AWS Glue and Apache Hudi**

- [AWS Glue](https://aws.amazon.com/glue/) 是一个无服务器的 ETL 服务. 允许用户无需创建和管理基础设施和服务器就能进行大数据处理. 利用 AWS Glue,  我们可以专注于 ETL 代码的实现并直接在 AWS 管理的集群上运行大数据处理工作.
- 而 [Apache Hudi](https://hudi.apache.org/) 是一款由 Uber 公司开发, 后来捐献给 Apache 开源软件基金会的, 专注于解决数据湖中的行级数据更新和删除, 以及让对数据湖的每一个操作都是 Transactional, 原子的, 可回滚的数据处理框架. 它本质是一个软件库, 无需配置任何集群. 它依赖于 Spark 计算引擎实现了行级数据更新和删除操作. 对于用户而言你只需要在运行 Spark Job 的时候启用了 Hudi 包, 并且在读写数据的时候传入以下 Hudi options 参数就可以了, 非常方便.

这两个技术配合起来组成了我们的 ETL Pipeline, 使得我们能首先读取 Initial Load 数据并将其写入 Data Lake. 而有了 Hudi, 对于 Incremental Data 的处理也变得极其简单, 仅仅是读取 Incremental data, 并直接写入 Data Lake, 而所有的 update, delete 则交给 Hudi 来处理即可, 用户无需干预.

**AWS Lambda**

[AWS Lambda](https://aws.amazon.com/lambda) 是一个无服务器, 事件驱动的计算服务. 用户无需创建和管理基础设施, 虽然它基于容器, 但用户甚至都不需要构建容器, 只需要专注于代码就可以部署高可用, 高并发的应用的云服务. 由于对 Incremental Data 的处理必须要按照顺序执行, 并且两次执行的数据之间最好不要有 overlap, 这需要一定的调度才能保证. AWS Lambda 则可以作为一个调度器来调度 ETL Job, 并担负一些额外的例如 logging, error handling, notification 等工作.

**AWS Athena**

[AWS Athena](https://aws.amazon.com/athena/) 是一个无服务器的大数据分析引擎. 用户无需创建和管理基础设施, 就可以用 SQL 对超大数据进行分析. 当我们完成了 Sync 之后, 分析师就可以用 Athena 对 DataLake 中的数据进行分析, 而无需担心影响业务数据库的性能了, 并且能获得更快的查询速度.

**Conclusion**

总结下来, 如果我们的数据模型设计合理, 我们就可以用 DMS 来将 initial load 和 incremental data 都导出到 S3 作为中间态数据, 然后就可以用 Glue + Hudi 将导出的数据 load 到 DataLake, 分析师就可以用 Athena 工具进行数据分析了. 其中用到的所有技术都是 Serverless 的, 用户无需购买硬件, 也无需管理服务器, 就可以长期稳定地将 Database 中的数据同步到 DataLake 中.

### Architecture Diagram

TODO

## Code Folder Structure Walkthrough

## Create Infrastructure

- S3 Bucket:
    - S3 artifacts bucket: store deployment artifacts, such as Lambda Function deployment package, Glue Job script.
    - S3 data bucket: store data lake data or all intermidiate data.
- IAM Roles:
    - ``dms-vpc-role``: allow DMS to use VPC
    - ``dms-cloudwatch-logs-role``: (Optional, better to have) allow DMS to use CloudWatch Logs, otherwise, you cannot see any log if your migration tasks fail.
    - DMS S3 Endpoint Role: allow DMS to access the S3 Location.
    - Lambda Role: allow the Glue ETL Job orchestrator Lambda Function to call AWS Services.
    - Glue Role: allow the Glue ETL Job to read and write data and manage AWS Glue Catalog.
- RDS Database:
    - DB Subnet Group: define which VPC and subnets to put your database.
    - Parameter Group: define the database instance parameter, in this project, we need to customize this to enable Postgres WAL (write ahead log).
    - RDS Postgres Database Instance: the source relational database for this project.

## Testing

### Test Database Connection

### Test Endpoint Connection

## Run Data Faker Script

## Run Migration Task

## Understand the ETL Job Implementation

## Verify the Data Consistency

## Conclusion