# Customize DataBuilder on SLModel in SecretFlow

The following codes are demos only. It's **NOT for production** due to system security concerns, please **DO NOT** use it directly in production. 

We support federated learning in vertical scenarios on SLModel in SecretFlow. In this tutorial, we will take DeepFM model for recommendation as an example to introduce how to customize a DataBuilder on SLModel. 

The main goal of this tutorial is to train DeepFM to show how to customize a DataBuilder on SLModel. 

*If you want to learn more about related content for DeepFM model and split plans, please move to related documents for DeepFM model.* 

## Environment Setting

In [7]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [8]:
import secretflow as sf

# Check the version of your SecretFlow
print('The version of SecretFlow: {}'.format(sf.__version__))

# In case you have a running secretflow runtime already.
sf.shutdown()
sf.init(['alice', 'bob', 'charlie'], address="local", log_to_driver=False)
alice, bob, charlie = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('charlie')

The version of SecretFlow: 1.4.0.dev20240105


2024-01-10 06:43:25,375	INFO worker.py:1538 -- Started a local Ray instance.


## Datasets

We will use the classic dataset [MovieLens](https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/movielens/ml-1m.zip) for introduction. MovieLens is an open recommendation system dataset that contains movie ratings and movie metadata information. 

And we split data into several pieces: 

- alice: "UserID", "Gender", "Age", "Occupation", "Zip-code" 
- bob: "MovieID", "Rating", "Title", "Genres", "Timestamp" 


In [9]:
## Define DataBuilders

We customize a DataBuilder with the aim to meet higher level customization demands, as the existing FedDataFrame and FedNdarray did not provide the required functionality. 

In Split Learning(where all sides of the data need to be aligned), we only provide DataBuilder for CSV data for the time being. You can define what to do with each row of data in the custom DataBuilder. The operations in SLModel depend on the way you define it. 

The things you should notice here: 

- We read MovieLens dataset in CSV format. However, it needs to be treated as sparse features. The DataBuilder will convert each column into the form of dictionary. 
- Bob's score was processed into binary form using threshold. 
- Custom functions can be defined in the DataBuilder and then applied to each column using dataset.map. 
- Only CSV format is supported in the vertical scenario due to the restriction that both sides of the data should be aligned. 
- It returns Dataset which has been defined Batch_size and Repeat in the CSV mode so that SLModel can infer the Steps_per_epoch according to the Dataset. 



### 1. Download the dataset and convert it to the CSV format.

In [15]:
%%capture
%%!
wget https://secretflow-data.oss-accelerate.aliyuncs.com/datasets/movielens/ml-1m.zip
unzip ./ml-1m.zip 

Read the data from the dat format and convert it to the dictionary form. 

In [16]:
def load_data(filename, columns):
    data = {}
    with open(filename, "r", encoding="unicode_escape") as f:
        for line in f:
            ls = line.strip("\n").split("::")
            data[ls[0]] = dict(zip(columns[1:], ls[1:]))
    return data

In [17]:
users_data = load_data(
    "./ml-1m/users.dat",
    columns=["UserID", "Gender", "Age", "Occupation", "Zip-code"],
)
movies_data = load_data("./ml-1m/movies.dat", columns=["MovieID", "Title", "Genres"])
ratings_columns = ["UserID", "MovieID", "Rating", "Timestamp"]

rating_data = load_data("./ml-1m/ratings.dat", columns=ratings_columns)

FileNotFoundError: [Errno 2] No such file or directory: './ml-1m/users.dat'

In [13]:
print(len(users_data))
print(len(movies_data))
print(len(rating_data))

NameError: name 'users_data' is not defined

Next, we join user, movie and rating, split up and assemble them into 'alice_ml1m.csv' and 'bob_ml1m.csv'. 

In [None]:
fed_csv = {alice: "alice_ml1m.csv", bob: "bob_ml1m.csv"}
csv_writer_container = {alice: open(fed_csv[alice], "w"), bob: open(fed_csv[bob], "w")}
part_columns = {
    alice: ["UserID", "Gender", "Age", "Occupation", "Zip-code"],
    bob: ["MovieID", "Rating", "Title", "Genres", "Timestamp"],
}

In [None]:
for device, writer in csv_writer_container.items():
    writer.write("ID," + ",".join(part_columns[device]) + "\n")

In [None]:
f = open("ml-1m/ratings.dat", "r", encoding="unicode_escape")


def _parse_example(feature, columns, index):
    if "Title" in feature.keys():
        feature["Title"] = feature["Title"].replace(",", "_")
    if "Genres" in feature.keys():
        feature["Genres"] = feature["Genres"].replace("|", " ")
    values = []
    values.append(str(index))
    for c in columns:
        values.append(feature[c])
    return ",".join(values)


index = 0
num_sample = 1000
for line in f:
    ls = line.strip().split("::")
    rating = dict(zip(ratings_columns, ls))
    rating.update(users_data.get(ls[0]))
    rating.update(movies_data.get(ls[1]))
    for device, columns in part_columns.items():
        parse_f = _parse_example(rating, columns, index)
        csv_writer_container[device].write(parse_f + "\n")
    index += 1
    if num_sample > 0 and index >= num_sample:
        break
for w in csv_writer_container.values():
    w.close()

## So we've done the data processing and splitting up by now. 

And we output two files:  
```
Alice: alice_ml1m.csv and Bob: bob_ml1m.csv 
```

In [None]:
! head alice_ml1m.csv

In [None]:
! head bob_ml1m.csv

### 1. Use plaintext engines to develop Databuilders. 

Because the data for each side of the SLModel is different, DataBuilder needs to be developed separately. 

#### Develop Alice's DataBuilder

In [None]:
import pandas as pd

alice_df = pd.read_csv("alice_ml1m.csv", encoding="utf-8")

In [None]:
alice_df["UserID"] = alice_df["UserID"].astype("string")
alice_df = alice_df.drop(columns="ID")
alice_df

In [None]:
import tensorflow as tf

alice_dict = dict(alice_df)
data_set = tf.data.Dataset.from_tensor_slices(alice_dict).batch(32).repeat(1)

In [None]:
data_set

#### Develop Bob's DataBuilder

In [None]:
bob_df = pd.read_csv("bob_ml1m.csv", encoding="utf-8")
bob_df = bob_df.drop(columns="ID")

bob_df["MovieID"] = bob_df["MovieID"].astype("string")

bob_df

In [None]:
label = bob_df["Rating"]
data = bob_df.drop(columns="Rating")


def _parse_bob(row_sample, label):
    import tensorflow as tf

    y_t = label
    y = tf.expand_dims(
        tf.where(
            y_t > 3,
            tf.ones_like(y_t, dtype=tf.float32),
            tf.zeros_like(y_t, dtype=tf.float32),
        ),
        axis=1,
    )
    return row_sample, y

In [None]:
bob_dict = tuple([dict(data), label])
data_set = tf.data.Dataset.from_tensor_slices(bob_dict).batch(32).repeat(1)

In [None]:
data_set = data_set.map(_parse_bob)

In [None]:
next(iter(data_set))

### 2. Wrap their DataBuilders separately.

In [None]:
# alice
def create_dataset_builder_alice(
    batch_size=128,
    repeat_count=5,
):
    def dataset_builder(x):
        import pandas as pd
        import tensorflow as tf

        x = [dict(t) if isinstance(t, pd.DataFrame) else t for t in x]
        x = x[0] if len(x) == 1 else tuple(x)
        data_set = (
            tf.data.Dataset.from_tensor_slices(x).batch(batch_size).repeat(repeat_count)
        )

        return data_set

    return dataset_builder


# bob
def create_dataset_builder_bob(
    batch_size=128,
    repeat_count=5,
):
    def _parse_bob(row_sample, label):
        import tensorflow as tf

        y_t = label["Rating"]
        y = tf.expand_dims(
            tf.where(
                y_t > 3,
                tf.ones_like(y_t, dtype=tf.float32),
                tf.zeros_like(y_t, dtype=tf.float32),
            ),
            axis=1,
        )
        return row_sample, y

    def dataset_builder(x):
        import pandas as pd
        import tensorflow as tf

        x = [dict(t) if isinstance(t, pd.DataFrame) else t for t in x]
        x = x[0] if len(x) == 1 else tuple(x)
        data_set = (
            tf.data.Dataset.from_tensor_slices(x).batch(batch_size).repeat(repeat_count)
        )

        data_set = data_set.map(_parse_bob)

        return data_set

    return dataset_builder

### 3. Construct a databuilder_dict.

In [None]:
data_builder_dict = {
    alice: create_dataset_builder_alice(
        batch_size=128,
        repeat_count=5,
    ),
    bob: create_dataset_builder_bob(
        batch_size=128,
        repeat_count=5,
    ),
}

## Define DeepFM Model and run it

### 1. Define a DeepFM model. 

In [None]:
from secretflow.ml.nn.applications.sl_deep_fm import DeepFMbase, DeepFMfuse
from secretflow.ml.nn import SLModel

NUM_USERS = 6040
NUM_MOVIES = 3952
GENDER_VOCAB = ["F", "M"]
AGE_VOCAB = [1, 18, 25, 35, 45, 50, 56]
OCCUPATION_VOCAB = [i for i in range(21)]
GENRES_VOCAB = [
    "Action",
    "Adventure",
    "Animation",
    "Children's",
    "Comedy",
    "Crime",
    "Documentary",
    "Drama",
    "Fantasy",
    "Film-Noir",
    "Horror",
    "Musical",
    "Mystery",
    "Romance",
    "Sci-Fi",
    "Thriller",
    "War",
    "Western",
]

### 2. Define Alice's basenet.

In [None]:
def create_base_model_alice():
    # Create model
    def create_model():
        import tensorflow as tf

        def preprocess():
            inputs = {
                "UserID": tf.keras.Input(shape=(1,), dtype=tf.string),
                "Gender": tf.keras.Input(shape=(1,), dtype=tf.string),
                "Age": tf.keras.Input(shape=(1,), dtype=tf.int64),
                "Occupation": tf.keras.Input(shape=(1,), dtype=tf.int64),
            }
            user_id_output = tf.keras.layers.Hashing(
                num_bins=NUM_USERS, output_mode="one_hot"
            )
            user_gender_output = tf.keras.layers.StringLookup(
                vocabulary=GENDER_VOCAB, output_mode="one_hot"
            )

            user_age_out = tf.keras.layers.IntegerLookup(
                vocabulary=AGE_VOCAB, output_mode="one_hot"
            )
            user_occupation_out = tf.keras.layers.IntegerLookup(
                vocabulary=OCCUPATION_VOCAB, output_mode="one_hot"
            )

            outputs = {
                "UserID": user_id_output(inputs["UserID"]),
                "Gender": user_gender_output(inputs["Gender"]),
                "Age": user_age_out(inputs["Age"]),
                "Occupation": user_occupation_out(inputs["Occupation"]),
            }
            return tf.keras.Model(inputs=inputs, outputs=outputs)

        preprocess_layer = preprocess()
        model = DeepFMbase(
            dnn_units_size=[256, 32],
            preprocess_layer=preprocess_layer,
        )
        model.compile(
            loss=tf.keras.losses.binary_crossentropy,
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
            ],
        )
        return model  # need wrap

    return create_model

### 3. Define Bob's basenet and fusenet. 

In [None]:
# bob model
def create_base_model_bob():
    # Create model
    def create_model():
        import tensorflow as tf

        # define preprocess layer
        def preprocess():
            inputs = {
                "MovieID": tf.keras.Input(shape=(1,), dtype=tf.string),
                "Genres": tf.keras.Input(shape=(1,), dtype=tf.string),
            }

            movie_id_out = tf.keras.layers.Hashing(
                num_bins=NUM_MOVIES, output_mode="one_hot"
            )
            movie_genres_out = tf.keras.layers.TextVectorization(
                output_mode='multi_hot', split="whitespace", vocabulary=GENRES_VOCAB
            )
            outputs = {
                "MovieID": movie_id_out(inputs["MovieID"]),
                "Genres": movie_genres_out(inputs["Genres"]),
            }
            return tf.keras.Model(inputs=inputs, outputs=outputs)

        preprocess_layer = preprocess()

        model = DeepFMbase(
            dnn_units_size=[256, 32],
            preprocess_layer=preprocess_layer,
        )
        model.compile(
            loss=tf.keras.losses.binary_crossentropy,
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
            ],
        )
        return model  # need wrap

    return create_model


def create_fuse_model():
    # Create model
    def create_model():
        import tensorflow as tf

        model = DeepFMfuse(dnn_units_size=[256, 256, 32])
        model.compile(
            loss=tf.keras.losses.binary_crossentropy,
            optimizer=tf.keras.optimizers.Adam(),
            metrics=[
                tf.keras.metrics.AUC(),
                tf.keras.metrics.Precision(),
                tf.keras.metrics.Recall(),
            ],
        )
        return model

    return create_model

In [None]:
base_model_dict = {alice: create_base_model_alice(), bob: create_base_model_bob()}
model_fuse = create_fuse_model()

In [None]:
from secretflow.data.vertical import read_csv as v_read_csv

vdf = v_read_csv(
    {alice: "alice_ml1m.csv", bob: "bob_ml1m.csv"}, keys="ID", drop_keys="ID"
)
label = vdf["Rating"]

data = vdf.drop(columns=["Rating", "Timestamp", "Title", "Zip-code"])
data["UserID"] = data["UserID"].astype("string")
data["MovieID"] = data["MovieID"].astype("string")

sl_model = SLModel(
    base_model_dict=base_model_dict,
    device_y=bob,
    model_fuse=model_fuse,
)
history = sl_model.fit(
    data,
    label,
    epochs=5,
    batch_size=128,
    random_seed=1234,
    dataset_builder=data_builder_dict,
)