# What is Find Incremental Matches

Find Incremental Matches (FIM) 是 Find Matches (FM) 的加强版本.

**我们先来快速复习一下什么是 FM**

FM 本质是一个 ML 的 Clustering 模型. 可以解决很多行业问题, 例如:

1. 从大量客户数据中根据名字, 电话, 地址等信息找到那些实际上是同一个人的数据. 这样就能更好的服务客户.
2. 互联网访问流量中根据 IP 地址, 浏览器 User Agent, Cookie 等判断哪些流量实际上是同一个人. 这样能更好的追踪用户行为以及统计分析.

从用户的角度看, 用户只要提供份数据, 一份是没有 Label 的训练数据, 叫做 Records, 另一份是这些数据的 Label, 叫做 Labels. 然后只要在 AWS Console 进行几下点击, 即可训练出一个模型. 作为用户你只需要确保:

1. 必须为 ``Records`` 数据定义一个 Glue Catalog Table, 训练模型的过程将会使用这个 Table 读取数据. 也就是说 Records 的数据可以用任何数据格式.
2. ``Labels`` 数据必须是 **单个 CSV 文件**. 必须要有 Header, 必须是 ``,`` 分割. 这意味着如果你的数据中有逗号, 那么你需要将所有的 value 用 ``"`` 给包裹起来.
3. ``Records`` 数据必须有且只有一列作为每一个 Record 的 Unique Identifier. 可以是整数也可以是字符串. 在下面的示例数据中 ``id`` 就是 Unique Identifier:

| id |   firstname   | lastname |     phone    |
|:--:|:-------------:|:--------:|:------------:|
| p1 |     obama     |  barrack | 123-456-7890 |
| p2 | Hussein Obama |  Barack  | 123-456-7890 |
| p3 |     Trump     |  Donald  | 333-444-5555 |

4. ``Labels`` 数据必须有包含原始 ``Records`` 数据的所有列, 并且额外包含两个特殊的 Column, 分别是 ``labeling_set_id`` 和 ``label``. 其中 ``labeling_set_id`` 是一个 Batch Id, 也就是说聚类的行为只会考虑同一个 ``labeling_set_id`` 内部的情况, 不会有一个 ``labeling_set_id`` 中的 record 和另一个 ``labeling_set_id`` 中的 record 被划分为一类的情况出现. 而 ``label`` 则是在一个 ``labeling_set_id`` 内标注哪些数据属于同一类的.  每一个 ``labeling_set_id`` 下的数据不要超过 300 条. 也不要太少, 至少在一个 ``labeling_set_id`` 内部有一些 records 属于同一个 label, 有些 record 的 label 只出现了一次. 这样 ML 才能从中学习到各种不同的情况. 下面的实例数据就是和前面的 ``Records`` 相匹配的 ``Labels`` 数据.

| labeling_set_id | label | id |   firstname   | lastname |     phone    |
|:---------------:|:-----:|:--:|:-------------:|:--------:|:------------:|
|        s1       |   l1  | p1 |     obama     |  barrack | 123-456-7890 |
|        s1       |   l1  | p2 | Hussein Obama |  Barack  | 123-456-7890 |
|        s1       |   l2  | p3 |     Trump     |  Donald  | 333-444-5555 |

模型训练好之后, 在 AWS Glue Console 中就会有一个 ML Transform 的算子出现. 接下就可以对新的 ``Records`` 数据进行聚类了. 新的 ``Records`` 数据和前面用来训练的 ``Records`` 数据的格式是一样的. 你只要在 Glue Job 里把用来测试的数据读成 Dynamic DataFrame, 就可以用 ``FindMatches`` 函数对其进行转化了. 示例代码如下:

```python
gdf_test = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options=dict(
        paths=[
            f"s3://.../test.csv"
        ],
    ),
    format="csv",
    format_options=dict(
        withHeader=True,
    ),
    transformation_ctx="datasource",
)

gdf_predict = FindMatches.apply(
    frame=gdf_test,
    transformId="tfm-a0ebb0bf0c700c56547843b6834493db",
    transformation_ctx="find_matches",
    computeMatchConfidenceScores=True,
)
```

这里 ``FindMatchess.apply`` 是里面的关键. 它的结果 ``gdf_predict`` 会比 ``gdf_test`` 多出两个 Column, 分别是 ``match_id`` 和 ``match_confidence_score``. ``match_id`` 相同的 records 被 ML 认为是属于同一个聚类. 这个 Score 是一个 0 ~ 1 之间的值. ``gdf_predict`` 中的数据示例如下:

| id |   firstname   | lastname |     phone    | match_id | match_confidence_score |
|:--:|:-------------:|:--------:|:------------:|:--------:|:----------------------:|
| p1 |     obama     |  barrack | 123-456-7890 |     1    |           1.0          |
| p2 | Hussein Obama |  Barack  | 123-456-7890 |     1    |           1.0          |
| p3 |     Trump     |  Donald  | 333-444-5555 |     2    |           0.5          |

**现在我们来了解一下什么是 Find Incremental Matches**

在实际应用中, 我们用来做 test 的数据通常是分批次到达的, 不断的会有新数据需要做聚类. 我们假设有 b1 (Batch 1), b2, b3 好了. b1 的结果是 p1 (Predict 1). 对 b1 做完预测后, b2 也到达了, 此时如果你把 b2 作为输入, 那么这个预测并不会考虑 b1 中的数据. 如果 b2 和 b1 中有些数据是需要被 match 到一起的, 那么该操作无法做到这点. 或者你可以把 b1 和 b2 的数据 merge 到一起再重新做一次预测, 但是这个 ML 计算将会对所有数据重新计算, 无法利用 b1 中的结果.

[FIM 的重点](https://docs.aws.amazon.com/glue/latest/dg/machine-learning-incremental-matches.html) 是利用了之前预测的结果, 避免对其进行重复计算, 每次预测只专注于增量数据, 从而提高了性能. 但是要注意的是, 如果 b2 数据中有能跟 b1 中的数据 match 上的, **原有的 b1 中的 match_id 和新的结果中的 match_id 的值会不一样, 但是能保证原来 b1 中 match_id 一样的数据在新的结果中仍然一样**.es 

下面, 我们来实际操作一下 Find Incremental Matches 的全部流程

# Install Dependencies

该实验的所有依赖保存在 ``requirements.txt`` 文件中.

In [2]:
%pip install -r requirements.txt

You should consider upgrading via the '/Users/sanhehu/venvs/python/3.8.11/dev_exp_share_venv/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [20]:
# Standard Library
import csv
import string
import random
from typing import List, Dict

# Third Party Library
import attr
from attrs_mate import AttrsClass
from pathlib_mate import Path

from boto_session_manager import BotoSesManager
from s3pathlib import S3Path, context

import pandas as pd
import awswrangler as wr

from faker import Faker
from rich import print as rprint

# Define Some Utility Functions

**[cn]**

首先我们先定义一些函数.

- ``add_noise_to_text``: 可以随机修改字符串中的几个字符. 这样就可以对 firstname, lastname, phone 之类的人为的添加一些错误, 看看 ML 是否能识别出这些小错误, 并把略有不同的数据判定为同一类.
- ``Person``: 每一个 Person 代表了了一条 record
- ``TruePerson``: 每一个 TruePerson 代表了一个真实的人, 这个人有 firstname, lastname, phone. 而每个 Person 其实是由 TruePerson 的数据增加随机噪音修改而来. 我们是知道 Person 原本是属于哪一个 TruePerson 的, 但是 FM 并不知道. 所以我们可以用这个信息反过来验证 FM 的结果.

**[en]**

First, we need to define some utility functions to keep our code clean.

In [21]:
def add_noise_to_text(text: str, n_noise: int) -> str:
    """
    Randomly add noise character to string.

    Example::

        >>> add_noise_to_text("1234567890", n_noise=3)
        123d56e890
    """
    length = len(text)
    if n_noise > length:
        raise ValueError
    chars = list(text)
    for _ in range(n_noise):
        chars[random.randint(1, length) - 1] = random.choice(string.ascii_lowercase)
    return "".join(chars)


def rand_phone_number() -> str:
    """
    Generate random phone number.

    Example::

        123-456-7890
    """
    numbers = [str(random.randint(0, 9)) for _ in range(10)]
    return "".join(
        numbers[:3]
        + [
            "-",
        ]
        + numbers[3:6]
        + [
            "-",
        ]
        + numbers[6:]
    )


def add_noise_to_phone(phone: str, n_noise: int) -> str:
    """
    Example::

        >>> add_noise_to_phone("111-222-3333", n_noise=1)
        111-222-3353
    """
    chars = list(phone)
    positions = [0, 1, 2, 4, 5, 6, 8, 9, 10]
    for ind in random.sample(positions, n_noise):
        chars[ind] = random.choice(string.digits)
    return "".join(chars)


fake = Faker(locale="en-US")


@attr.define
class Person(AttrsClass):
    """
    A variation of a true person.
    """
    firstname: str = attr.ib()
    lastname: str = attr.ib()
    phone: str = attr.ib()


@attr.define
class TruePerson(AttrsClass):
    """
    represent a concrete human person. he/she could have multiple
    name, phone.
    """
    id: int = attr.ib()
    firstname_list: str = attr.ib()
    lastname_list: str = attr.ib()
    phone_list: str = attr.ib()

    @classmethod
    def random(cls, id: int) -> 'TruePerson':
        firstname = fake.first_name()
        lastname = fake.last_name()
        phone = rand_phone_number()

        firstname_list = [
            firstname,
        ]
        lastname_list = [
            lastname,
        ]
        phone_list = [
            phone,
        ]
        for _ in range(2):
            firstname_list.append(add_noise_to_text(firstname, random.randint(1, 2)))
            lastname_list.append(add_noise_to_text(lastname, random.randint(1, 2)))
            phone_list.append(add_noise_to_phone(phone, random.randint(0, 1)))

        return cls(
            id=id,
            firstname_list=firstname_list,
            lastname_list=lastname_list,
            phone_list=phone_list,
        )

    def to_person(self) -> 'Person':
        return Person(
            firstname=random.choice(self.firstname_list),
            lastname=random.choice(self.lastname_list),
            phone=random.choice(self.phone_list),
        )

In [22]:
print("------ Sample TruePerson ------")
t_person = TruePerson.random(id=1)
rprint(t_person.to_dict())

------ Sample TruePerson ------


In [23]:
print("------ Sample Person ------")
person = t_person.to_person()
rprint(person.to_dict())

------ Sample Person ------


# Define Your Test Environment Configuration

**[CN]**

我们来定义一下你所要使用的 AWS Account 以及存储数据的 S3 bucket 的一些信息.

**[EN]**

We need to define the AWS Credential, the AWS S3 bucket we use to store the data and the Glue Catalog database / table name.

In [24]:
# Define AWS boto3 credentials for session
bsm = BotoSesManager(profile_name="aws_data_lab_sanhe_us_east_2")
context.attach_boto_session(boto_ses=bsm.boto_ses)

# Where you want to store your test data locally
dir_here = Path.cwd()

# Where you want to store your test data on S3
s3path_prefix = S3Path.from_s3_uri(
    "s3://aws-data-lab-sanhe-for-everything-us-east-2/poc/2022-05-18-glue-find-matches/find-incr-matches/"
).to_dir()

# Clear all existing data
for p in dir_here.select_by_ext(".csv"):
    p.remove_if_exists()
s3path_prefix.delete_if_exists()

path_all_csv = Path(dir_here, "all.csv")
path_records_csv = Path(dir_here, "records.csv")
path_labels_csv = Path(dir_here, "labels.csv")
path_tests_csv = Path(dir_here, "tests.csv")
path_predicts_csv = Path(dir_here, "predicts.csv")
path_compares_csv = Path(dir_here, "compares.csv")

s3path_records = S3Path(s3path_prefix, "records")
s3path_labels = S3Path(s3path_prefix, "labels")
s3path_tests = S3Path(s3path_prefix, "tests")
s3path_predicts = S3Path(s3path_prefix, "predicts")

# Glue Catalog config
db_name = "learn_glue_find_matches"
tb_name_records = "records"
tb_name_labels = "labels"
tb_name_tests = "tests"
tb_name_predicts = "predicts"

# pandas.to_csv default keyword arguments
quoting = csv.QUOTE_NONNUMERIC
pd_to_csv_kwargs = dict(
    sep=",",
    index=False,
    header=True,
    quoting=quoting,
)
# awswrangler.s3.to_csv default keyword arguments
wr_to_csv_kwargs = dict(
    sep=",",
    index=False,
    header=True,
    quoting=quoting,
)

# Define Your Test Dataset Configuration

**[CN]**



**[EN]**

We need to define some statistics information about our test dataset

- ``n_label_set_id``: based on [official document](https://docs.aws.amazon.com/glue/latest/dg/machine-learning.html), the training dataset should be split into "chunks". Each chunk is a "labeling_set". Within each "labeling_set", you could have many records (no more than 300, it helps the ML training fast and efficient), and those records can be clustered into different "group". Each group should have a "label". Records belongs to the same "group" considered as a "match". With in each "labeling_set", you should have some "match group" that has many records, and also have some "non match group" that has only one record, which indicate that this record doesn't match any of the others. **This config defines the total number of labeling set you want to generate**.
- ``n_label_per_set``: The number of records in each labeling set.
- ``n_sample_list``: records in each labeling set will be divided into different group, this config defines the number of records for each group.


# Generate Training Data

In [25]:
columns_all = "labeling_set_id,label,id,tid,firstname,lastname,phone".split(",")
columns_train = "id,firstname,lastname,phone".split(",")
columns_label = "labeling_set_id,label,id,firstname,lastname,phone".split(",")


def generate_dataset(
    start_id: int,
    start_tid: int,
    n_labeling_set_id: int,
    hist: List[int],
) -> pd.DataFrame:
    """
    :param start_id: 全局唯一的 Id 的初始值. 例如生成 1000 条数据, 起始值是 1,
        那么就会 Id 就是 1 ~ 1000.
    :param start_tid: True Person Id 的初始值. 数据集包含 100 个 TruePerson,
        起始值是 1, 那么就会 Tid 就是 1 ~ 100.
    :param n_labeling_set_id: 整个数据集分成多少个 labeling set. 对于 training
        而言, 每个 labeling set 是一个独立的数据集, 不同 labeling set 之间的
        数据不会被互相匹配.
    :param hist: 在每个 labeling set 内, 属于同一个 True Person 数据的频次统计.
        例如 [10, 6, 3, 1] 就代表一个 labeling set 内, 有 4 个 True Person,
        其中 10 条数据都是同一个人的数据的变种. 其中 6, 3, 1 条数据都是 (下略).
    """
    # global unique identifier for each records
    id = start_id - 1

    # the True Person id, if multiple records has the same tid,
    # they should considered as a match
    # We can use this value to validate the ML predict
    tid = start_tid - 1

    rows = list()
    for labeling_set_id in range(1, 1 + n_labeling_set_id):
        for label, n_sample in enumerate(hist, start=1):
            tid += 1
            true_person = TruePerson.random(tid)
            for _ in range(n_sample):
                id += 1
                person = true_person.to_person()
                row = dict(
                    labeling_set_id=f"LabelingSetId-{labeling_set_id}",
                    label=f"Label-{label}",
                    id=f"PersonId-{str(id).zfill(5)}",
                    tid=f"TrueId-{tid}",
                    firstname=person.firstname,
                    lastname=person.lastname,
                    phone=person.phone,
                )
                rows.append(row)

    df = pd.DataFrame(
        rows,
        columns=columns_all,
    )
    return df

In [26]:
df_all_for_train = generate_dataset(
    start_id=1,
    start_tid=1,
    n_labeling_set_id=1000,
    hist=[20, 10, 6, 3, 1],
)

由于我们定义的 ``hist = [20, 10, 6, 3, 1]``, 可以看出 PersonId 1 ~ 20  是同一个人, 21 ~ 30 是同一个人, 31 ~ 36 是同一个人, 37 ~ 39 是同一个人, 40 是单独一个人.

In [27]:
print("------ training dataset with label ------")
print(f"{df_all_for_train.shape[0]} rows, {df_all_for_train.shape[1]} columns")
df_all_for_train.head(40)

------ training dataset with label ------
40000 rows, 7 columns


Unnamed: 0,labeling_set_id,label,id,tid,firstname,lastname,phone
0,LabelingSetId-1,Label-1,PersonId-00001,TrueId-1,Donjw,Phillips,854-708-7535
1,LabelingSetId-1,Label-1,PersonId-00002,TrueId-1,Donna,Phillipu,854-708-7535
2,LabelingSetId-1,Label-1,PersonId-00003,TrueId-1,Donjw,Phillipu,854-708-7535
3,LabelingSetId-1,Label-1,PersonId-00004,TrueId-1,Dvnno,Phillips,854-708-7535
4,LabelingSetId-1,Label-1,PersonId-00005,TrueId-1,Donjw,Phillipu,854-708-7535
5,LabelingSetId-1,Label-1,PersonId-00006,TrueId-1,Donna,shillnps,854-708-7535
6,LabelingSetId-1,Label-1,PersonId-00007,TrueId-1,Dvnno,Phillips,854-708-7535
7,LabelingSetId-1,Label-1,PersonId-00008,TrueId-1,Donjw,Phillipu,854-708-7535
8,LabelingSetId-1,Label-1,PersonId-00009,TrueId-1,Donjw,shillnps,854-708-7535
9,LabelingSetId-1,Label-1,PersonId-00010,TrueId-1,Dvnno,shillnps,854-708-7535


In [28]:
df_train = df_all_for_train[columns_train]
df_label = df_all_for_train[columns_label]

训练数据是不包含 ``labeling_set_id`` 和 ``label`` 两列的.

In [29]:
print("------ training dataset WITHOUT label ------")
print(f"{df_train.shape[0]} rows, {df_train.shape[1]} columns")
df_train.head(40)

------ training dataset WITHOUT label ------
40000 rows, 4 columns


Unnamed: 0,id,firstname,lastname,phone
0,PersonId-00001,Donjw,Phillips,854-708-7535
1,PersonId-00002,Donna,Phillipu,854-708-7535
2,PersonId-00003,Donjw,Phillipu,854-708-7535
3,PersonId-00004,Dvnno,Phillips,854-708-7535
4,PersonId-00005,Donjw,Phillipu,854-708-7535
5,PersonId-00006,Donna,shillnps,854-708-7535
6,PersonId-00007,Dvnno,Phillips,854-708-7535
7,PersonId-00008,Donjw,Phillipu,854-708-7535
8,PersonId-00009,Donjw,shillnps,854-708-7535
9,PersonId-00010,Dvnno,shillnps,854-708-7535


label 数据是包含 ``labeling_set_id`` 和 ``label`` 两列的. 这和我们之前的 ``hist`` 中的设置一致. 并且 label 数据要包含训练数据的所有信息.

In [30]:
print("------ training dataset label ONLY  ------")
print(f"{df_label.shape[0]} rows, {df_label.shape[1]} columns")
df_label.head(40)

------ training dataset label ONLY  ------
40000 rows, 6 columns


Unnamed: 0,labeling_set_id,label,id,firstname,lastname,phone
0,LabelingSetId-1,Label-1,PersonId-00001,Donjw,Phillips,854-708-7535
1,LabelingSetId-1,Label-1,PersonId-00002,Donna,Phillipu,854-708-7535
2,LabelingSetId-1,Label-1,PersonId-00003,Donjw,Phillipu,854-708-7535
3,LabelingSetId-1,Label-1,PersonId-00004,Dvnno,Phillips,854-708-7535
4,LabelingSetId-1,Label-1,PersonId-00005,Donjw,Phillipu,854-708-7535
5,LabelingSetId-1,Label-1,PersonId-00006,Donna,shillnps,854-708-7535
6,LabelingSetId-1,Label-1,PersonId-00007,Dvnno,Phillips,854-708-7535
7,LabelingSetId-1,Label-1,PersonId-00008,Donjw,Phillipu,854-708-7535
8,LabelingSetId-1,Label-1,PersonId-00009,Donjw,shillnps,854-708-7535
9,LabelingSetId-1,Label-1,PersonId-00010,Dvnno,shillnps,854-708-7535


In [31]:
# Dump to local and s3
path_train_csv = Path(dir_here, "01-train.csv")
df_train.to_csv(path_train_csv, **pd_to_csv_kwargs)

path_label_csv = Path(dir_here, "02-label.csv")
df_label.to_csv(path_label_csv, **pd_to_csv_kwargs)

s3path_train_csv = S3Path(s3path_prefix, "01-train", "1.csv")
wr.s3.to_csv(df_train, path=s3path_train_csv.uri, **wr_to_csv_kwargs)
print(f"created {s3path_train_csv.uri}")
print(f"  preview at {s3path_train_csv.uri}")

s3path_label_csv = S3Path(s3path_prefix, "02-label", "1.csv")
wr.s3.to_csv(df_label, path=s3path_label_csv.uri, **wr_to_csv_kwargs)
print(f"created {s3path_label_csv.uri}")
print(f"  preview at {s3path_label_csv.uri}")

created s3://aws-data-lab-sanhe-for-everything-us-east-2/poc/2022-05-18-glue-find-matches/find-incr-matches/01-train/1.csv
  preview at s3://aws-data-lab-sanhe-for-everything-us-east-2/poc/2022-05-18-glue-find-matches/find-incr-matches/01-train/1.csv
created s3://aws-data-lab-sanhe-for-everything-us-east-2/poc/2022-05-18-glue-find-matches/find-incr-matches/02-label/1.csv
  preview at s3://aws-data-lab-sanhe-for-everything-us-east-2/poc/2022-05-18-glue-find-matches/find-incr-matches/02-label/1.csv


# Create and Train the ML Transformation Model

这一步我们用已经生成并且上传到 S3 的训练出一个模型.

在创建 ML 模型之前, 我们先要为 Records 创建一个 Glue Table. 这个 Glue Table 我建议在第一次实验的时候手动创建, 以避免 crawler 自动创建的时候的各种不可预见的问题. 一定要记得添加 ``skip.header.line.count = 1`` 这个属性, 使得能够跳过 Header.

- AWS Glue Console (旧的, 不是 Glue Studio) -> Job (legacy) -> ML Transforms -> 点击 Add Transform 按钮
- Transform Properties: 输入一个名字, 选择 IAM Role
- Data Source: 这里选择 ``Records`` 所对应的 Glue Table
- Primary Key: 指定 Primary Key
- Tune: 指定 ``Labels`` 所对应的 CSV 文件, 这里不能使用 Glue Table, 点击 Estimate Quality 看一下训练的质量, 这步需要个 3-5 分钟

至此你在 Console 里就能看到一个 Transforms 了, 点击后能看到 Transform ID, 这个 ID 是你在做 Predict 的时候给 ``FindMatches`` 算子的参数

# Genereate Test Data

**[CN]**

下面我们生成一些测试数据, 然后用已经训练好的模型来做测试, 看看效果. 这里我们可以用之前定义的 ``genereate_dataset`` 生成一个新的 DataFrame. 这里要注意 ``start_id`` 不能跟以前一样是 1, 不然这个 ``start_id`` 会和以前的训练数据有冲突, 逻辑上也不对.

**[EN]**

Now, let's take a look at the dataset.

In [32]:
df_all_for_test = generate_dataset(
    start_id=40001,
    start_tid=1,
    n_labeling_set_id=1000,
    hist=[20, 10, 6, 3, 1],
).drop(columns=["labeling_set_id", "label"])

df_initial_match = df_all_for_test.sample(frac=0.7)[columns_train]
df_initial_match = df_initial_match.sort_values(by="id")
df_incremental_match = df_all_for_test.loc[df_all_for_test.index.difference(df_initial_match.index),columns_train]

我们把测试数据分成两份, 一份 70% 作为 initial match, 另一份 30% 作为 incremental match. 这两份中必然有一些数据是属于同一个 TruePerson 的. initial match 之后很多数据会被标上 ``match_id``, 我们希望用 incremental match 之后, 属于同一个 TruePerson 的数据能跟 initial match 中标记的 ``match_id`` 对应上.

我们先简单的预览一下 initial 和 incremental 两份数据. 可以看出 initial 和 incremental 分别有 28k 和 12k 条数据, 总和能跟前面的 ``1000 * sum([20, 10, 6, 3, 1]) = 40k`` 对应上. 并且互相之间没有重叠.

In [33]:
df_initial_match[["id",]]

Unnamed: 0,id
0,PersonId-40001
2,PersonId-40003
3,PersonId-40004
4,PersonId-40005
6,PersonId-40007
...,...
39995,PersonId-79996
39996,PersonId-79997
39997,PersonId-79998
39998,PersonId-79999


In [34]:
df_incremental_match[["id",]]


Unnamed: 0,id
1,PersonId-40002
5,PersonId-40006
7,PersonId-40008
10,PersonId-40011
13,PersonId-40014
...,...
39978,PersonId-79979
39981,PersonId-79982
39984,PersonId-79985
39986,PersonId-79987


然后我们将 initial 和 incremental 都 load 到 S3 中, 这里要把我们用来验证的 label 信息去掉. 并且把原始数据在本地做一个备份, 以便之后验证.

In [35]:
path_test_csv = Path(dir_here, "03-test.csv")
df_all_for_test.to_csv(path_test_csv, **pd_to_csv_kwargs)

path_initial_csv = Path(dir_here, "04-initial.csv")
df_initial_match.to_csv(path_initial_csv, **pd_to_csv_kwargs)

path_incremental_csv = Path(dir_here, "05-incremental.csv")
df_incremental_match.to_csv(path_incremental_csv, **pd_to_csv_kwargs)

s3path_initial_csv = S3Path(s3path_prefix, "04-initial", "1.csv")
wr.s3.to_csv(df_initial_match, path=s3path_initial_csv.uri, **wr_to_csv_kwargs)
print(f"created {s3path_initial_csv.uri}")
print(f"  preview at {s3path_initial_csv.uri}")

s3path_incremental_csv = S3Path(s3path_prefix, "05-incremental", "1.csv")
wr.s3.to_csv(df_incremental_match, path=s3path_incremental_csv.uri, **wr_to_csv_kwargs)
print(f"created {s3path_incremental_csv.uri}")
print(f"  preview at {s3path_incremental_csv.uri}")

created s3://aws-data-lab-sanhe-for-everything-us-east-2/poc/2022-05-18-glue-find-matches/find-incr-matches/04-initial/1.csv
  preview at s3://aws-data-lab-sanhe-for-everything-us-east-2/poc/2022-05-18-glue-find-matches/find-incr-matches/04-initial/1.csv
created s3://aws-data-lab-sanhe-for-everything-us-east-2/poc/2022-05-18-glue-find-matches/find-incr-matches/05-incremental/1.csv
  preview at s3://aws-data-lab-sanhe-for-everything-us-east-2/poc/2022-05-18-glue-find-matches/find-incr-matches/05-incremental/1.csv


# Run the Initial Find Matches

测试数据也已经被上传到 S3 了, 现在我们需要在 AWS Glue Studio 中启动一个 Jupyter Notebook Type Job 运行这个 Initial Find Matches Job. 

你可以在 ``learn_find_incr_matches_initial_match.ipynb`` 中找到实例代码. 并一路执行到 ``Execute The ML Transformation for Initial Match`` 这个部分 (把这一部分的代码执行完, 直到下一个 ``Execute the Transformation for Incremental Match`` 之前). 至此我们成功的执行了 Initial Find Matches, 获得了第一批次 28k 数据的结果, 并将其保存到了 S3 中.

# Run the Incremental Find Matches

接下来我们继续执行 Glue Job, 一路执行到最后, 至此我们成功的执行了 Incremental Matches, 获得了把第一批次和第二批次的数据合并在一起的结果, 并将其保存到了 S3 中

# Verify the Results

下面我们可以用一个 ``read_match_results`` 函数将两次 Find Matches 的结果读出来并写到本地磁盘以供检查. 之所以要一个函数是因为 Glue 在 Dump results 的时候是用的并行写, 所以会生成很多小文件. 我们需要用一段程序将其合并.

In [13]:
def read_match_results(s3path_prefix: S3Path) -> pd.DataFrame:
    """
    Merge and download the predict output data from S3 to Local.
    The Glue ML job usually use multiple worker and dump the results
    to many small files in parallel,
    """
    df_list = list()
    for s3path in S3Path(s3path_prefix).iter_objects():
        with s3path.open("r") as f:
            df = pd.read_csv(f)
            df_list.append(df)

    # concatenate all data frame
    df_match_results = pd.concat(df_list, axis=0)

    # re order by ID
    df_match_results = df_match_results.sort_values(by="id")
    return df_match_results


In [36]:
df_initial_match_results = read_match_results(S3Path(s3path_prefix, "06-match-results").to_dir())
path_initial_match_results_csv = Path(dir_here, "06-initial-match-results.csv")
df_initial_match_results.to_csv(path_initial_match_results_csv, **pd_to_csv_kwargs)

In [37]:
print("------ initial match results ------")
print(f"{df_initial_match_results.shape[0]} rows, {df_initial_match_results.shape[1]} columns")
df_initial_match_results.head(20)



------ initial match results ------
28000 rows, 6 columns


Unnamed: 0,id,firstname,lastname,phone,match_id,match_confidence_score
0,PersonId-40001,Sara,Whste,468-400-1568,0,1.0
8,PersonId-40003,Sara,Whife,468-400-1568,0,1.0
2,PersonId-40004,Sarr,Whste,468-400-1568,0,1.0
6,PersonId-40005,Syra,White,468-400-1568,0,1.0
5,PersonId-40007,Sarr,Whife,468-400-1568,0,1.0
7,PersonId-40009,Sarr,Whife,468-400-1568,0,1.0
1,PersonId-40010,Sara,Whste,468-400-1568,0,1.0
4,PersonId-40012,Syra,Whste,468-400-1568,0,1.0
9,PersonId-40013,Sara,White,468-400-1568,0,1.0
3,PersonId-40015,Sara,Whife,468-400-1568,0,1.0


In [38]:
df_incremental_match_results = read_match_results(S3Path(s3path_prefix, "07-incr-match-results").to_dir())
path_incremental_match_results_csv = Path(dir_here, "07-incremental-match-results.csv")
df_incremental_match_results.to_csv(path_incremental_match_results_csv, **pd_to_csv_kwargs)

In [39]:
print("------ incremental match results ------")
print(f"{df_incremental_match_results.shape[0]} rows, {df_incremental_match_results.shape[1]} columns")
df_incremental_match_results.head(20)

------ incremental match results ------
40000 rows, 5 columns


Unnamed: 0,id,firstname,lastname,phone,match_id
122,PersonId-40001,Sara,Whste,468-400-1568,42949672960
125,PersonId-40002,Sarr,Whife,468-400-1568,42949672960
114,PersonId-40003,Sara,Whife,468-400-1568,42949672960
111,PersonId-40004,Sarr,Whste,468-400-1568,42949672960
113,PersonId-40005,Syra,White,468-400-1568,42949672960
115,PersonId-40006,Syra,White,468-400-1568,42949672960
120,PersonId-40007,Sarr,Whife,468-400-1568,42949672960
126,PersonId-40008,Sara,Whste,468-400-1568,42949672960
124,PersonId-40009,Sarr,Whife,468-400-1568,42949672960
128,PersonId-40010,Sara,Whste,468-400-1568,42949672960


从结果看来, Incremental Match