# 在SecretFlow中使用DataBuilder进行SLModel学习


The following codes are demos only. It's **NOT for production** due to system security concerns, please **DO NOT** use it directly in production.

隐语在SLModel中提供了联邦学习在垂直场景的支持，本文将以DeepFM推荐场景为例介绍下，如何在SLModel中使用自定义DataBuilder  
这篇文档的主要目标是借助DeepFM训练来介绍如何在SLModel中使用自定义DataBuilder  
*注：DeepFM相关的内容以及拆分方案，请移步DeepFM相关文档*

## 环境设置

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import secretflow as sf

# In case you have a running secretflow runtime already.
sf.shutdown()
sf.init(['alice', 'bob', 'charlie'], address="local", log_to_driver=False)
alice, bob ,charlie = sf.PYU('alice'), sf.PYU('bob') , sf.PYU('charlie')

2023-04-07 17:58:11,452	INFO worker.py:1538 -- Started a local Ray instance.


## 数据集介绍

我们这里将使用最经典的MovieLens数据集来进行演示。  
MovieLens是一个开放式的推荐系统数据集，包含了电影评分和电影元数据信息。  
[数据集地址](https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/movielens/ml-1m.zip)

我们对数据进行了切分：
- alice: "UserID", "Gender", "Age", "Occupation", "Zip-code"
- bob:   "MovieID", "Rating", "Title", "Genres", "Timestamp"

## 定义DataBuilder函数  

我们定义DataBuilder的目的是已有的FedDataFrame和FedNdarray提供的功能无法满足需求，我们可以在通过自定义DataBuilder来满足高阶定制化需求。  
在SplitLearning中（各方数据需要对齐）我们暂时仅提供了面向CSV数据的DataBuilder能力。在自定义DataBuilder中可以定义对每一行数据怎么处理。SLModel会根据您定义的方式进行操作。

需要注意的点:  
- 我们的MovieLens数据集读取进来后会是CSV格式的，但是需要被当做稀疏特征来进行处理，所以会在DataBuilder中将每一列转成字典形式
- 在Bob测我们将评分进行了Threshold处理成二值形式
- 在DataBuilder中可以定义自定义函数，然后使用dataset.map来应用到每一列。
- 因为垂直模式需要保证两方数据是对齐的，所以暂时只支持CSV模式
- CSV模式只需要返回Dataset，Dataset需要定义好Batch_size，Repeat，SLModel会根据Dataset推断Steps_per_epoch

### 1、下载数据，并转换成csv

In [3]:
%%capture
%%!
wget https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/movielens/ml-1m.zip
unzip ./ml-1m.zip 

E0407 17:58:18.231911774  122073 fork_posix.cc:76]           Other threads are currently calling into gRPC, skipping fork() handlers


读取dat格式的数据并转成字典

In [4]:
def load_data(filename, columns):
    data = {}
    with open(filename, "r", encoding="unicode_escape") as f:
        for line in f:
            ls = line.strip("\n").split("::")
            data[ls[0]] = dict(zip(columns[1:], ls[1:]))
    return data

In [5]:
users_data = load_data(
        "./ml-1m/users.dat",
        columns=["UserID", "Gender", "Age", "Occupation", "Zip-code"],
    )
movies_data = load_data(
        "./ml-1m/movies.dat", columns=["MovieID", "Title", "Genres"]
    )
ratings_columns = ["UserID", "MovieID", "Rating", "Timestamp"]

rating_data = load_data(
        "./ml-1m/ratings.dat", columns = ratings_columns
)

In [6]:
print(len(users_data))
print(len(movies_data))
print(len(rating_data))

6040
3883
6040


接下来我们将user、movie和rating进行join，并进行拆分，组装成`alice_ml1m.csv`和`bob_ml1m.csv`

In [7]:
fed_csv = {
    alice: "alice_ml1m.csv",
    bob: "bob_ml1m.csv"
}
csv_writer_container = {
    alice: open(fed_csv[alice], "w"),
    bob: open(fed_csv[bob], "w")
}
part_columns = {
    alice:["UserID", "Gender", "Age", "Occupation", "Zip-code"],
    bob:["MovieID", "Rating", "Title", "Genres", "Timestamp"]
}

In [8]:
for device, writer in csv_writer_container.items():
    writer.write("ID," + ",".join(part_columns[device]) + "\n")

In [9]:
f = open("ml-1m/ratings.dat", "r", encoding="unicode_escape")

def _parse_example(feature, columns, index):
    if "Title" in feature.keys():
        feature["Title"] = feature["Title"].replace(",", "_")
    if "Genres" in feature.keys():
        feature["Genres"] = feature["Genres"].replace("|", " ")
    values = []
    values.append(str(index))
    for c in columns:
        values.append(feature[c])
    return ",".join(values)

index = 0
num_sample = 1000
for line in f:
    ls = line.strip().split("::")
    rating = dict(zip(ratings_columns, ls))
    rating.update(users_data.get(ls[0]))
    rating.update(movies_data.get(ls[1]))
    for device, columns in part_columns.items():
        parse_f = _parse_example(rating, columns, index)
        csv_writer_container[device].write(parse_f + "\n")
    index += 1
    if num_sample > 0 and index >= num_sample:
        break
for w in csv_writer_container.values():
    w.close()

## 到此为止我们已经完成了数据的处理和拆分
产出了
```
alice: alice_ml1m.csv
bob: bob_ml1m.csv
```

In [10]:
! head alice_ml1m.csv

ID,UserID,Gender,Age,Occupation,Zip-code
0,1,F,1,10,48067
1,1,F,1,10,48067
2,1,F,1,10,48067
3,1,F,1,10,48067
4,1,F,1,10,48067
5,1,F,1,10,48067
6,1,F,1,10,48067
7,1,F,1,10,48067
8,1,F,1,10,48067


In [11]:
! head bob_ml1m.csv

ID,MovieID,Rating,Title,Genres,Timestamp
0,1193,5,One Flew Over the Cuckoo's Nest (1975),Drama,978300760
1,661,3,James and the Giant Peach (1996),Animation Children's Musical,978302109
2,914,3,My Fair Lady (1964),Musical Romance,978301968
3,3408,4,Erin Brockovich (2000),Drama,978300275
4,2355,5,Bug's Life_ A (1998),Animation Children's Comedy,978824291
5,1197,3,Princess Bride_ The (1987),Action Adventure Comedy Romance,978302268
6,1287,5,Ben-Hur (1959),Action Adventure Drama,978302039
7,2804,5,Christmas Story_ A (1983),Comedy Drama,978300719
8,594,4,Snow White and the Seven Dwarfs (1937),Animation Children's Musical,978302268


### 1. 使用明文引擎开发DataBuilder函数  
因为SLModel每方的数据是不同的，所以需要分别开发DataBuilder

#### 开发Alice侧的DataBuilder

In [12]:
import pandas as pd
alice_df = pd.read_csv("alice_ml1m.csv",encoding="utf-8")

In [13]:
alice_df["UserID"] = alice_df["UserID"].astype("string")
alice_df = alice_df.drop(columns="ID")
alice_df

Unnamed: 0,UserID,Gender,Age,Occupation,Zip-code
0,1,F,1,10,48067
1,1,F,1,10,48067
2,1,F,1,10,48067
3,1,F,1,10,48067
4,1,F,1,10,48067
...,...,...,...,...,...
995,10,F,35,1,95370
996,10,F,35,1,95370
997,10,F,35,1,95370
998,10,F,35,1,95370


In [14]:
import tensorflow as tf
alice_dict = dict(alice_df)
data_set = (
    tf.data.Dataset.from_tensor_slices(alice_dict).batch(32).repeat(1)
)

In [15]:
data_set

<RepeatDataset element_spec={'UserID': TensorSpec(shape=(None,), dtype=tf.string, name=None), 'Gender': TensorSpec(shape=(None,), dtype=tf.string, name=None), 'Age': TensorSpec(shape=(None,), dtype=tf.int64, name=None), 'Occupation': TensorSpec(shape=(None,), dtype=tf.int64, name=None), 'Zip-code': TensorSpec(shape=(None,), dtype=tf.int64, name=None)}>

#### 开发Bob侧的DataBuilder


In [16]:
bob_df = pd.read_csv("bob_ml1m.csv",encoding="utf-8")
bob_df = bob_df.drop(columns="ID")

bob_df["MovieID"] = bob_df["MovieID"].astype("string")

bob_df

Unnamed: 0,MovieID,Rating,Title,Genres,Timestamp
0,1193,5,One Flew Over the Cuckoo's Nest (1975),Drama,978300760
1,661,3,James and the Giant Peach (1996),Animation Children's Musical,978302109
2,914,3,My Fair Lady (1964),Musical Romance,978301968
3,3408,4,Erin Brockovich (2000),Drama,978300275
4,2355,5,Bug's Life_ A (1998),Animation Children's Comedy,978824291
...,...,...,...,...,...
995,3704,2,Mad Max Beyond Thunderdome (1985),Action Sci-Fi,978228364
996,1020,3,Cool Runnings (1993),Comedy,978228726
997,784,3,Cable Guy_ The (1996),Comedy,978230946
998,858,3,Godfather_ The (1972),Action Crime Drama,978224375


In [17]:
label = bob_df["Rating"]
data = bob_df.drop(columns="Rating")

def _parse_bob(row_sample, label):
    import tensorflow as tf

    y_t = label
    y = tf.expand_dims(
        tf.where(
            y_t > 3,
            tf.ones_like(y_t, dtype=tf.float32),
            tf.zeros_like(y_t, dtype=tf.float32),
        ),
        axis=1,
    )
    return row_sample, y

In [18]:
bob_dict = tuple([dict(data),label])
data_set = (tf.data.Dataset.from_tensor_slices(bob_dict).batch(32).repeat(1))

In [19]:
data_set = data_set.map(_parse_bob)

In [20]:
next(iter(data_set))

({'MovieID': <tf.Tensor: shape=(32,), dtype=string, numpy=
  array([b'1193', b'661', b'914', b'3408', b'2355', b'1197', b'1287',
         b'2804', b'594', b'919', b'595', b'938', b'2398', b'2918', b'1035',
         b'2791', b'2687', b'2018', b'3105', b'2797', b'2321', b'720',
         b'1270', b'527', b'2340', b'48', b'1097', b'1721', b'1545', b'745',
         b'2294', b'3186'], dtype=object)>,
  'Title': <tf.Tensor: shape=(32,), dtype=string, numpy=
  array([b"One Flew Over the Cuckoo's Nest (1975)",
         b'James and the Giant Peach (1996)', b'My Fair Lady (1964)',
         b'Erin Brockovich (2000)', b"Bug's Life_ A (1998)",
         b'Princess Bride_ The (1987)', b'Ben-Hur (1959)',
         b'Christmas Story_ A (1983)',
         b'Snow White and the Seven Dwarfs (1937)',
         b'Wizard of Oz_ The (1939)', b'Beauty and the Beast (1991)',
         b'Gigi (1958)', b'Miracle on 34th Street (1947)',
         b"Ferris Bueller's Day Off (1986)", b'Sound of Music_ The (1965)',
       

### 2. 将单方的DataBuilder 包装(wrap)起来

In [21]:
# alice 
def create_dataset_builder_alice(
    batch_size=128,
    repeat_count=5,
):
    def dataset_builder(x):
        import pandas as pd
        import tensorflow as tf

        x = [dict(t) if isinstance(t, pd.DataFrame) else t for t in x]
        x = x[0] if len(x) == 1 else tuple(x)
        data_set = (
            tf.data.Dataset.from_tensor_slices(x).batch(batch_size).repeat(repeat_count)
        )

        return data_set

    return dataset_builder

# bob
def create_dataset_builder_bob(
    batch_size=128,
    repeat_count=5,
):
    def _parse_bob(row_sample, label):
        import tensorflow as tf

        y_t = label["Rating"]
        y = tf.expand_dims(
            tf.where(
                y_t > 3,
                tf.ones_like(y_t, dtype=tf.float32),
                tf.zeros_like(y_t, dtype=tf.float32),
            ),
            axis=1,
        )
        return row_sample, y

    def dataset_builder(x):
        import pandas as pd
        import tensorflow as tf

        x = [dict(t) if isinstance(t, pd.DataFrame) else t for t in x]
        x = x[0] if len(x) == 1 else tuple(x)
        data_set = (
            tf.data.Dataset.from_tensor_slices(x).batch(batch_size).repeat(repeat_count)
        )

        data_set = data_set.map(_parse_bob)

        return data_set

    return dataset_builder

### 3.构造databuilder_dict

In [22]:
data_builder_dict = {
        alice: create_dataset_builder_alice(
            batch_size=128,
            repeat_count=5,
        ),
        bob: create_dataset_builder_bob(
            batch_size=128,
            repeat_count=5,
        ),
    }

## 接下来我们定义DeepFM模型，将数据跑起来

### 定义DeepFM模型

In [23]:
from secretflow.ml.nn.applications.sl_deep_fm import DeepFMbase, DeepFMfuse
from secretflow.ml.nn import SLModel
NUM_USERS = 6040
NUM_MOVIES = 3952
GENDER_VOCAB = ["F", "M"]
AGE_VOCAB = [1, 18, 25, 35, 45, 50, 56]
OCCUPATION_VOCAB = [i for i in range(21)]
GENRES_VOCAB = [
    "Action",
    "Adventure",
    "Animation",
    "Children's",
    "Comedy",
    "Crime",
    "Documentary",
    "Drama",
    "Fantasy",
    "Film-Noir",
    "Horror",
    "Musical",
    "Mystery",
    "Romance",
    "Sci-Fi",
    "Thriller",
    "War",
    "Western",
]

### 定义alice的basenet

In [24]:
def create_base_model_alice():

    # Create model
    def create_model():
        import tensorflow as tf

        def preprocess():
            inputs = {
                "UserID": tf.keras.Input(shape=(1,), dtype=tf.string),
                "Gender": tf.keras.Input(shape=(1,), dtype=tf.string),
                "Age": tf.keras.Input(shape=(1,), dtype=tf.int64),
                "Occupation": tf.keras.Input(shape=(1,), dtype=tf.int64),
            }
            user_id_output = tf.keras.layers.Hashing(
                num_bins=NUM_USERS, output_mode="one_hot"
            )
            user_gender_output = tf.keras.layers.StringLookup(
                vocabulary=GENDER_VOCAB, output_mode="one_hot"
            )

            user_age_out = tf.keras.layers.IntegerLookup(
                vocabulary=AGE_VOCAB, output_mode="one_hot"
            )
            user_occupation_out = tf.keras.layers.IntegerLookup(
                vocabulary=OCCUPATION_VOCAB, output_mode="one_hot"
            )

            outputs = {
                "UserID": user_id_output(inputs["UserID"]),
                "Gender": user_gender_output(inputs["Gender"]),
                "Age": user_age_out(inputs["Age"]),
                "Occupation": user_occupation_out(inputs["Occupation"]),
            }
            return tf.keras.Model(inputs=inputs, outputs=outputs)

        preprocess_layer = preprocess()
        model = DeepFMbase(
            dnn_units_size=[256, 32],
            preprocess_layer=preprocess_layer,
        )
        model.compile(
            loss=tf.keras.losses.binary_crossentropy,
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
            ],
        )
        return model  # need wrap

    return create_model

### 定义bob的basenet和fusenet

In [25]:
# bob model
def create_base_model_bob():
    # Create model
    def create_model():
        import tensorflow as tf
        
        # define preprocess layer
        def preprocess():
            inputs = {
                "MovieID": tf.keras.Input(shape=(1,), dtype=tf.string),
                "Genres": tf.keras.Input(shape=(1,), dtype=tf.string),
            }

            movie_id_out = tf.keras.layers.Hashing(
                num_bins=NUM_MOVIES, output_mode="one_hot"
            )
            movie_genres_out = tf.keras.layers.TextVectorization(
                output_mode='multi_hot', split="whitespace", vocabulary=GENRES_VOCAB
            )
            outputs = {
                "MovieID": movie_id_out(inputs["MovieID"]),
                "Genres": movie_genres_out(inputs["Genres"]),
            }
            return tf.keras.Model(inputs=inputs, outputs=outputs)

        preprocess_layer = preprocess()

        model = DeepFMbase(
            dnn_units_size=[256, 32],
            preprocess_layer=preprocess_layer,
        )
        model.compile(
            loss=tf.keras.losses.binary_crossentropy,
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
            ],
        )
        return model  # need wrap

    return create_model

def create_fuse_model():
    # Create model
    def create_model():
        import tensorflow as tf

        model = DeepFMfuse(dnn_units_size=[256, 256, 32])
        model.compile(
            loss=tf.keras.losses.binary_crossentropy,
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
            ],
        )
        return model

    return create_model

In [26]:
base_model_dict = {
    alice: create_base_model_alice(), 
    bob: create_base_model_bob()
}
model_fuse = create_fuse_model()

In [27]:
from secretflow.data.vertical import read_csv as v_read_csv

vdf = v_read_csv(
    {alice:"alice_ml1m.csv",
     bob:"bob_ml1m.csv"},
    keys="ID",
    drop_keys="ID")
label = vdf["Rating"]

data = vdf.drop(columns=["Rating", "Timestamp", "Title", "Zip-code"])
data["UserID"] = data["UserID"].astype("string")
data["MovieID"] = data["MovieID"].astype("string")

sl_model = SLModel(
            base_model_dict=base_model_dict,
            device_y=bob,
            model_fuse=model_fuse,
        )
history = sl_model.fit(
    data,
    label,
    epochs=5,
    batch_size=128,
    random_seed=1234,
    dataset_builder=data_builder_dict,
)

INFO:root:Create proxy actor <class 'secretflow.ml.nn.sl.backend.tensorflow.sl_base.PYUSLTFModel'> with party alice.
INFO:root:Create proxy actor <class 'secretflow.ml.nn.sl.backend.tensorflow.sl_base.PYUSLTFModel'> with party bob.
INFO:root:SL Train Params: {'self': <secretflow.ml.nn.sl.sl_model.SLModel object at 0x7ff1d2132d90>, 'x': VDataFrame(partitions={alice: Partition(data=<secretflow.device.device.pyu.PYUObject object at 0x7ff2e8d4fe80>), bob: Partition(data=<secretflow.device.device.pyu.PYUObject object at 0x7ff2e8d4f610>)}, aligned=True), 'y': VDataFrame(partitions={bob: Partition(data=<secretflow.device.device.pyu.PYUObject object at 0x7ff2e8d4f4c0>)}, aligned=True), 'batch_size': 128, 'epochs': 5, 'verbose': 1, 'callbacks': None, 'validation_data': None, 'shuffle': False, 'sample_weight': None, 'validation_freq': 1, 'dp_spent_step_freq': None, 'dataset_builder': {alice: <function create_dataset_builder_alice.<locals>.dataset_builder at 0x7ff2e8f6ae50>, bob: <function create