# Run ETL with Status Tracking

[CN]

**挑战**

我们希望对每一个文件的 ETL 是否成功进行追踪.

**解决方案**

AWS Glue ETL 的编程模型包含三个重要概念, Input -> Transformation -> Output. Status Tracking 的最小单位就是一个个的 Input. 如果 Source 是 S3, 那么 Input 则是一个个的文件. 

所以我们只要把处理每一个单个文件的 ETL 逻辑封装成一个函数. 调用这个函数前创建一条 Dynamodb 记录. 如果抛出了异常则更新 status attribute. 这样我们只要查询 Dynamodb 数据库就知道哪些成功了哪些不成功.

我们考虑一个非常简单的 JSON to Parquet 的 ETL 逻辑. 数据用的是人造数据, 模拟银行的 Transaction 流水. 详情请参考 ``dataset.ipynb`` 文件

In [1]:
import sys
import pyspark.sql.functions as sql_funcs
from pyspark.context import SparkContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import *

# Create SparkContext
sparkContext = SparkContext.getOrCreate()

# Create Glue Context
glueContext = GlueContext(sparkContext)

# Get spark session
spark = glueContext.spark_session

# Resolve job parameters
# Uncomment this in Glue ETL job
# args = getResolvedOptions(sys.argv, ["JOB_NAME"
# job = Job(glueContext)
# job.init(args['JOB_NAME'], args)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5,application_1646085135716_0032,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
from s3pathlib import S3Path

class Config:
    region = "us-east-2"
    source_bucket = "aws-data-lab-sanhe-for-everything-us-east-2"
    source_prefix = "poc/learn-big-data-on-aws/glue-job-examples/04-glue-job-best-practice/status-tracking/bank_transaction/source/"
    target_bucket = "aws-data-lab-sanhe-for-everything-us-east-2"
    target_prefix = "poc/learn-big-data-on-aws/glue-job-examples/04-glue-job-best-practice/status-tracking/bank_transaction/target/"
    
    n_file = 100 # 一共多少个文件
    n_rows_per_file = 1000 # 每个文件由多少行
    n_acc = 20000 # 模拟多少个银行账户互相转账
    failed_rate = 10 # 按照百分之几的比例创建 "坏" 文件, 5 就是 5%
    
    @property
    def s3path_source(self) -> S3Path:
        return S3Path(self.source_bucket, self.source_prefix)

    @property
    def s3path_target(self) -> S3Path:
        return S3Path(self.target_bucket, self.target_prefix)
    
config = Config()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [5]:
from datetime import datetime

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Define Dynamodb Schema

Dynamodb 是一个高性能, 无需管理基础设置, 自动 Scale 的 Key Value Store 数据库. 特别适合用来记录每个 S3 File 的状态. 我们可以用 0 标记 TODO, 1 标记处理失败, 2 标记处理成功.

这里需要注意一点. 下面的代码在 Laptop 本地的开发环境里是可以用来创建 Dynamodb Table 的. 但是在 Glue Dev Endpoint 中, 由于用的版本是 Glue 1.0, 也就是 Python3.6, 其中的 botocore 版本过低, 导致不支持 pynamodb 中的 create_table API 中的一些参数. **所以建议现在本地用下面的代码预先创建好 Dynamodb Table**.

In [22]:
import pynamodb
from pynamodb.models import Model, GlobalSecondaryIndex
from pynamodb.connection import Connection
from pynamodb.attributes import UnicodeAttribute, NumberAttribute
from pynamodb.indexes import KeysOnlyProjection

# create boto3 dynamodb client connection with default AWS profile
connection = Connection()

class Status:
    todo = 0
    failed = 1
    success = 2


class StatusIndex(GlobalSecondaryIndex):
    class Meta:
        index = "status-index"
        projection = KeysOnlyProjection

    status = NumberAttribute(hash_key=True)
    s3uri = UnicodeAttribute()


# Create bank account data model
class Tracker(Model):
    class Meta:
        """
        declare metadata about the table
        """
        table_name = "learn_big_data_on_aws_glue_tracker"
        region = "us-east-2"

        # billing mode
        # doc: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html
        # pay as you go mode
        billing_mode = pynamodb.models.PAY_PER_REQUEST_BILLING_MODE

        # provisioned mode
        # write_capacity_units = 10
        # read_capacity_units = 10

    # define attributes
    s3uri = UnicodeAttribute(hash_key=True)
    status = NumberAttribute(default=Status.todo)

    status_index = StatusIndex()

# Create dynamodb table if not exists, if already exists, this code won't do anything
# Don't use this with PySpark Magic kernel on Glue 1.0 Dev Endpoint, the botocore version is too old,
# and don't have ``BillingMode`` argument
Tracker.create_table(wait=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Glue / PySpark 的 ``from_options`` API 支持读取一个 S3 Folder. 然后 Spark 内部就会让各个 Node 同时开始并行读取, 然后将所有数据抽象成一个大型的 DataFrame. 这么做有个缺点, 数据都进入到了大型 DataFrame, 但你无从得知哪一行数据属于哪一个文件.

如果你用的是 ``from_catalog`` API 则可以用 ``input_file_name`` API 和 ``withColumn`` API 来获得原始文件名, 并将其添加到一列.

但无论那种情况, 你都无法做到对每一个文件进行 ETL Transformation, 然后一旦出错, 就在 Dynamodb 中将其标记为 failed.

所以为了获得更好的 tracking 能力, 我们将对单个文件进行处理的逻辑封装成了函数, 然后对所有的 s3 files 进行遍历, 然后用 try exception 来包围这个函数的调用. 一旦有异常, 我们则将其标记为失败. 这样做的代价是无法利用并行处理, 处理速度会变慢, 好处是我们对每个文件都有很详细的追踪能力. 这种做法我们最好确保每个文件都比较大, 这样性能损失不会很多. 如果每个文件都很小, 你不如直接用 Lambda 做 transformation 会更好处理.

```scala
var df = glueContext.getCatalogSource(
  database = database,
  tableName = table,
  transformationContext = s"source-$database.$table"
).getDynamicFrame()
 .toDF()
 .withColumn("input_file_name", input_file_name())

glueContext.getSinkWithFormat(
  connectionType = "s3",
  options = JsonOptions(Map(
    "path" -> args("DST_S3_PATH")
  )),
  transformationContext = "",
  format = "parquet"
).writeDynamicFrame(DynamicFrame(df, glueContext))

```

In [24]:
def run_glue_etl_logic(s3path: S3Path):
    try:
        tracker = Tracker.get(s3path.uri)
    except Tracker.DoesNotExist:
        tracker = Tracker(s3uri=s3path.uri)
        tracker.save()
    
    if tracker.status != Status.todo:
        return
    
    #-----------------------------------------------------------------------------
    # Step 1. Read the Data
    #-----------------------------------------------------------------------------
    gdf = glueContext.create_dynamic_frame.from_options(
        connection_type="s3", 
        connection_options=dict(
            paths=[s3path.uri,],
            recurse=True,
        ),
        format="json",
        format_options=dict(multiLine=True),
        transformation_ctx="datasource",
    )
    
    #-----------------------------------------------------------------------------
    # Step 2. Transform the data
    #-----------------------------------------------------------------------------
    columns = gdf.toDF().columns
    if set(columns) != set(["trans_id", "from_acc", "to_acc", "balance", "created_time"]):
        raise ValueError
        
    #-----------------------------------------------------------------------------
    # Step 3. Write the data
    #-----------------------------------------------------------------------------
    data_sink_parquet = glueContext.write_dynamic_frame.from_options(
        frame=gdf,
        connection_type="s3",
        connection_options={
            "path": config.s3path_target.uri,
        },
        format="parquet",
        transformation_ctx="DataSink_parquet"
    )
        
st = datetime.now()
for s3path in config.s3path_source.iter_objects():
    tracker = Tracker(s3uri=s3path.uri)
    try:
        run_glue_etl_logic(s3path)
        tracker.status = 2
    except Exception as e:
        tracker.status = 1
    tracker.save()
et = datetime.now()
elapse = (et - st).total_seconds()
print(f"所有文件已被处理完毕!, 耗时 {elapse} 秒...")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

所有文件已被处理完毕!, 耗时 142.259167 秒...

In [28]:
gdf_total = glueContext.create_dynamic_frame.from_options(
    connection_type="s3", 
    connection_options=dict(
        paths=[config.s3path_target.uri,],
        recurse=True,
    ),
    format="parquet",
    transformation_ctx="gdf_total",
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
total_rows_in_target = gdf_total.toDF().count()
print(f"target 目录下所有文件中实际有 {total_rows_in_target} 条记录")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

target 目录下所有文件中实际有 86000 条记录

In [27]:
# 查看成功的文件总数有多少, 
success_file_count = StatusIndex.count(hash_key=Status.success)
expected_target_rows = success_file_count * config.n_rows_per_file
print(f"target 目录下所有文件中应该有 {expected_target_rows} 条记录")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

target 目录下所有文件中应该有 86000 条记录