In [25]:
import pandas as pd
import numpy as np
import torch
import sys
import torch.nn as nn
import traceback
import pickle
import random
import torch.nn.functional as F
import os
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sentence_transformers import SentenceTransformer
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from deepctr_torch.inputs import SparseFeat, VarLenSparseFeat, get_feature_names
from deepctr_torch.models.din import DIN
from deepctr_torch.models import DeepFM
import flwr as fl
from collections import OrderedDict
from typing import List, Tuple
from flwr.common import (
    Code,
    EvaluateIns,
    EvaluateRes,
    FitIns,
    FitRes,
    GetParametersIns,
    GetParametersRes,
    Status,
    ndarrays_to_parameters,
    parameters_to_ndarrays,
)

np.random.seed(42)

In [26]:
class DatasetCFG:
    data_root = 'ml-1m'


class FedCFG:
    num_clients = 10

### 获取训练数据以及辅助函数


In [27]:
def split(x):
    key2index = {}
    key_ans = x.split('|')
    for key in key_ans:
        if key not in key2index:
            # Notice : input value 0 is a special "padding",so we do not use 0 to encode valid feature for sequence input
            key2index[key] = len(key2index) + 1
    # return torch.tensor(list(map(lambda x: key2index[x], key_ans)))
    return torch.tensor([1, 2])


def get_data():
    data = pd.read_csv(os.path.join(
        DatasetCFG.data_root, 'ratings_data_process.csv'))
    sparse_features = ["movieId", "userId",
                       "gender", "age", "occupation", "zipCode"]
    target = ['rating']
    for feat in sparse_features:
        lbe = LabelEncoder()
        data[feat] = lbe.fit_transform(data[feat])
    # data['rating'] = data['rating'].apply(lambda x: 1 if x > 3 else 0)

    key2index = {}
    genres_list = list(map(split, data['genres'].values))
    genres_length = np.array(list(map(len, genres_list)))
    max_len = max(genres_length)
    # Notice : padding=`post`
    genres_list = pad_sequence(genres_list)

    # 2.count #unique features for each sparse field
    fixlen_feature_columns = [SparseFeat(feat, data[feat].nunique(), embedding_dim=4)
                              for feat in sparse_features]

    varlen_feature_columns = [VarLenSparseFeat(SparseFeat('genres', vocabulary_size=len(
        key2index) + 1, embedding_dim=4), maxlen=max_len, combiner='mean')]  # Notice : value 0 is for padding for sequence input feature

    linear_feature_columns = fixlen_feature_columns + varlen_feature_columns
    dnn_feature_columns = fixlen_feature_columns + varlen_feature_columns
    linear_feature_columns = fixlen_feature_columns
    dnn_feature_columns = fixlen_feature_columns

    feature_names = get_feature_names(
        linear_feature_columns + dnn_feature_columns)
    data["genres"] = genres_list.T

    return data, feature_names, linear_feature_columns, dnn_feature_columns,  target


def get_datas():
    data = pd.read_csv(os.path.join(
        DatasetCFG.data_root, 'ratings_data_process.csv'))
    sparse_features = ["movieId", "userId",
                       "gender", "age", "occupation", "zipCode"]
    target = ['rating']
    for feat in sparse_features:
        lbe = LabelEncoder()
        data[feat] = lbe.fit_transform(data[feat])

    # 2.count #unique features for each sparse field
    fixlen_feature_columns = [SparseFeat(feat, data[feat].nunique())
                              for feat in sparse_features]
    linear_feature_columns = fixlen_feature_columns
    dnn_feature_columns = fixlen_feature_columns
    feature_names = get_feature_names(
        linear_feature_columns + dnn_feature_columns)

    data_group = []
    datas = []
    for name, group in data.groupby('userId'):
        data_group.append(group)
    np.random.shuffle(data_group)
    spilt_size = int(len(data_group)/FedCFG.num_clients)
    for i in range(FedCFG.num_clients):
        start = i*spilt_size
        end = (i+1)*spilt_size
        data_tem = pd.concat(data_group[start:end])
        datas.append(data_tem)

    return datas, feature_names, linear_feature_columns, dnn_feature_columns,  target

### 联邦学习


In [28]:
def get_parameters(net) -> List[np.ndarray]:
    a = [val.cpu().numpy() for _, val in net.state_dict().items()]
    return a


def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


class FlowerClient(fl.client.Client):
    def __init__(self, cid, net, train_dataloader, val_dataset, val_dataloader, target):
        self.cid = cid
        self.net = net
        self.train_dataloader = train_dataloader
        self.val_dataset = val_dataset
        self.val_dataloader = val_dataloader
        self.target = target

    def get_parameters(self, ins: GetParametersIns) -> GetParametersRes:
        ndarrays: List[np.ndarray] = get_parameters(self.net)
        # Serialize ndarray's into a Parameters object
        parameters = ndarrays_to_parameters(ndarrays)
        # Build and return response
        status = Status(code=Code.OK, message="Success")
        return GetParametersRes(
            status=status,
            parameters=parameters,
        )

    def fit(self, ins: FitIns) -> FitRes:
        print(f"[Client {self.cid}] fit, config: {ins.config}")
        # Deserialize parameters to NumPy ndarray's
        parameters_original = ins.parameters
        ndarrays_original = parameters_to_ndarrays(parameters_original)
        # Update local model, train, get updated parameters
        set_parameters(self.net, ndarrays_original)
        self.net.fit(self.train_dataloader, self.train_dataloader[self.target].values,
                     batch_size=256, epochs=10, verbose=2, validation_split=0.2)
        ndarrays_updated = get_parameters(self.net)
        # Serialize ndarray's into a Parameters object
        parameters_updated = ndarrays_to_parameters(ndarrays_updated)
        # Build and return response
        status = Status(code=Code.OK, message="Success")
        return FitRes(
            status=status,
            parameters=parameters_updated,
            num_examples=len(self.train_dataloader),
            metrics={},
        )

    def evaluate(self, ins: EvaluateIns) -> EvaluateRes:
        print(f"[Client {self.cid}] evaluate, config: {ins.config}")

        # Deserialize parameters to NumPy ndarray's
        parameters_original = ins.parameters
        ndarrays_original = parameters_to_ndarrays(parameters_original)

        set_parameters(self.net, ndarrays_original)
        pred_ans = self.net.predict(self.val_dataloader, batch_size=256)
        mse = round(mean_squared_error(
            self.val_dataset[self.target].values, pred_ans), 4)

        # Build and return response
        status = Status(code=Code.OK, message="Success")
        return EvaluateRes(
            status=status,
            loss=float(mse),
            num_examples=len(self.val_dataloader),
            metrics={"mse": float(mse)},
        )


# device = torch.device('cpu' if torch.cuda.is_available() else 'cpu')
# datas, feature_names, linear_feature_columns, dnn_feature_columns, target = get_datas()
# train_datasets = [None for _ in range(FedCFG.num_clients)]
# val_datasets = [None for _ in range(FedCFG.num_clients)]
# train_dataloaders = [None for _ in range(FedCFG.num_clients)]
# val_dataloaders = [None for _ in range(FedCFG.num_clients)]

# for i in tqdm(range(FedCFG.num_clients)):
#     train_datasets[i], val_datasets[i] = train_test_split(
#         datas[i], test_size=0.2, random_state=i)
#     train_dataloaders[i] = {name: train_datasets[i][name]
#                             for name in feature_names}
#     val_dataloaders[i] = {name: val_datasets[i][name]
#                           for name in feature_names}


def client_fn(cid: str) -> FlowerClient:
    # Load model
    net = DeepFM(linear_feature_columns, dnn_feature_columns,
                 task='regression', device=device)
    net.compile("adam", "mse", metrics=['mse'], )
    train_dataloader = train_dataloaders[int(cid)]
    val_dataloader = val_dataloaders[int(cid)]
    val_dataset = val_datasets[int(cid)]

    return FlowerClient(cid, net, train_dataloader, val_dataset, val_dataloader, target)


# strategy = fl.server.strategy.FedAvg(
#     fraction_fit=1.0,
#     fraction_evaluate=0.5,
#     min_fit_clients=10,
#     min_evaluate_clients=5,
#     min_available_clients=10,
# )

# client_resources = None
# if device.type == "cuda":
#     client_resources = {"num_gpus": 1}

# fl.simulation.start_simulation(
#     client_fn=client_fn,
#     num_clients=FedCFG.num_clients,
#     config=fl.server.ServerConfig(num_rounds=5),
#     strategy=strategy,
#     client_resources=client_resources,
# )

In [29]:
data, feature_names, linear_feature_columns, dnn_feature_columns, target = get_data()
pickle.dump(feature_names, open('feature_names.pkl', 'wb'))
pickle.dump(linear_feature_columns, open('linear_feature_columns.pkl', 'wb'))
pickle.dump(dnn_feature_columns, open('dnn_feature_columns.pkl', 'wb'))
# 3.generate input data for model
train, test = train_test_split(data, test_size=0.2)
train_model_input = {name: train[name] for name in feature_names}
# train_model_input['genres']=train['genres']
test_model_input = {name: test[name] for name in feature_names}
# test_model_input['genres']=test['genres']
# 4.Define Model,train,predict and evaluate

device = 'cuda:1'

model = DeepFM(linear_feature_columns, dnn_feature_columns, dnn_hidden_units=(512, 256, 128),
               task='regression', device=device)
model.compile("adam", "mse", metrics=['mse'], )
# pickle.dump(model.state_dict(), open('model.pkl', 'wb'))

history = model.fit(train_model_input, train[target].values,
                    batch_size=256, epochs=20, verbose=1, validation_split=0.2)
torch.save(model.state_dict(), 'model.pkl')
pred_ans = model.predict(test_model_input, batch_size=256)
print("test MSE", round(mean_squared_error(
    test[target].values, pred_ans), 4))

cuda:1
Train on 640133 samples, validate on 160034 samples, 2501 steps per epoch


2501it [00:21, 117.05it/s]


Epoch 1/20
22s - loss:  0.9539 - mse:  0.9539 - val_mse:  0.8469


2501it [00:21, 119.06it/s]


Epoch 2/20
22s - loss:  0.8350 - mse:  0.8350 - val_mse:  0.8301


2501it [00:22, 110.08it/s]


Epoch 3/20
23s - loss:  0.8178 - mse:  0.8178 - val_mse:  0.8194


2501it [00:21, 118.27it/s]


Epoch 4/20
22s - loss:  0.8032 - mse:  0.8032 - val_mse:  0.8099


2501it [00:21, 116.52it/s]


Epoch 5/20
22s - loss:  0.7889 - mse:  0.7889 - val_mse:  0.8004


2501it [00:21, 117.03it/s]


Epoch 6/20
22s - loss:  0.7718 - mse:  0.7718 - val_mse:  0.7876


1773it [00:14, 120.03it/s]


KeyboardInterrupt: 