In [None]:
%load_ext autotime
%load_ext autoreload
%autoreload 2

# Distributed Machine Learning

Eines der grössten Schwierigkeiten ist es passende ML Algorithmen zu finden. Die meisten Algorithmen werden aus Performancegründen in C, C++ oder ähnlichne Sprachen geschrieben. R und Python sind für die Entwicklung von Algorithmen ungeeignet. Um dennoch aufzuzigen, dass Federeated Learning einen Mehrwert bringt werden wir ein Cluster von Docker Containern aufbauen. Dieser Cluster besteht aus 2 Worker Nodes und einem Master Node. Gesteuert werden diese mit [Open MPI](https://www.open-mpi.org/) (Open Source High Performance Computing - Library). MPI wird von XGBoost als auch von Microsofts LightGBM unterstützt. Letzteres verwenden wir an dieser Stelle, da die Implementierung sich weniger Komplex gestaltet.

## LightGBM
[LigthGBM Distributed Learning](https://github.com/microsoft/LightGBM/blob/master/docs/Parallel-Learning-Guide.rst)





## Data Preparation


In [None]:
#export
import lightgbm as lgb
import numpy as np
import pandas as pd

from community_learning import base_model
from community_learning import features
from itertools import compress
from community_learning.example import get_prediction, evaluate_predictions, get_two_region_data

In [None]:
data = get_two_region_data()

load data
prepare data


In [None]:
data.keys()

dict_keys(['product_dict', 'product_reverse_dict', 'feature_cols', 'target_cols', 'train', 'test', 'train_south', 'train_north', 'train_X_south', 'train_y_south', 'train_X_north', 'train_y_north', 'train_X', 'train_y', 'test_south', 'test_north'])

In [None]:
#we prepare the data in one csv file where y is the label column
export_cols = data['feature_cols'].copy()
export_cols += ['y']

train = data['train']
train_south = train.loc[train.region == 'south',]
train_north = train.loc[train.region == 'north',]
train[export_cols].to_csv('data/final/data_lightgbm/train.csv', index = False)
train_south[export_cols].to_csv('data/final/data_lightgbm/train_south.csv', index = False)
train_north[export_cols].to_csv('data/final/data_lightgbm/train_north.csv', index = False)


time: 3.57 s


In [None]:
valid = pd.read_csv('data/final/data_lightgbm/validation.csv')
valid = base_model.encode_products(valid)
valid[export_cols].to_csv('data/final/data_lightgbm/validation_enc.csv', index = False)

time: 176 ms


## train_community.conf
```
task = train
valid_data = validation_enc.csv
boosting_type = gbdt
objective = multiclass
num_class = 22
eta = 0.05
min_child_weight = 1
#subsample = 0.7
subsample = 1
colsample_bytree = 0.7
max_depth = 8
metric = multi_logloss
metric_freq = 1
is_training_metric = true
num_trees = 50
data = train.csv
header = true
label_column = name:y
machine_list_file = mlist.txt
num_machines = 2
tree_learner = data
seed = 0
#num_threads = 1
output_model = model.txt
```


## run cluster training
```
mpiexec --machinefile mlist.txt -npernode 1  ./lightgbm config=train.conf
```

In [None]:
train_org, test = base_model.load_data()
product_dict = base_model.get_product_dict(train_org)
product_reverse_dict = base_model.get_product_reverse_dict(train_org)    

In [None]:
data = get_two_region_data()

load data
prepare data


In [None]:
bst = lgb.Booster(model_file='data/final/data_lightgbm/model.txt')
preds = bst.predict(data['test'][base_model.feature_cols])

In [None]:
preds = np.argsort(preds, axis=1)
preds = np.fliplr(preds)[:,:7]
preds = pd.DataFrame(preds)
preds = preds.applymap(lambda x: product_reverse_dict[x])
preds['added_products'] = preds.apply(lambda x: list(x.values), axis=1)
preds = preds['added_products']
test_data = test.copy()
target_cols = base_model.target_cols
test_data.reset_index(inplace=True)
test_data['added_products'] = preds
test_data['truth_list'] = test_data[target_cols].apply(lambda x: list(compress(target_cols, x.values)), axis=1)
test_data['apk'] = test_data.apply(lambda x: base_model.apk(x['truth_list'], x['added_products']),axis=1)
print(f"mean average precision single node all data = {test_data['apk'].mean()}")

mean average precision single node all data = 0.022738471706301475
time: 1min 2s


In [None]:
bst = lgb.Booster(model_file='data/final/data_lightgbm/model_north.txt')
preds = bst.predict(test[base_model.feature_cols])
preds = np.argsort(preds, axis=1)
preds = np.fliplr(preds)[:,:7]
preds = pd.DataFrame(preds)
preds = preds.applymap(lambda x: product_reverse_dict[x])
preds['added_products'] = preds.apply(lambda x: list(x.values), axis=1)
preds = preds['added_products']
test_data = test.copy()
target_cols = base_model.target_cols
test_data.reset_index(inplace=True)
test_data['added_products'] = preds
test_data['truth_list'] = test_data[target_cols].apply(lambda x: list(compress(target_cols, x.values)), axis=1)
test_data['apk'] = test_data.apply(lambda x: base_model.apk(x['truth_list'], x['added_products']),axis=1)
print(f"mean average precision single node north = {test_data['apk'].mean()}")

mean average precision single node north = 0.022714324920308386


In [None]:
bst = lgb.Booster(model_file='data/final/data_lightgbm/model_south.txt')
preds = bst.predict(test[base_model.feature_cols])
preds = np.argsort(preds, axis=1)
preds = np.fliplr(preds)[:,:7]
preds = pd.DataFrame(preds)
preds = preds.applymap(lambda x: product_reverse_dict[x])
preds['added_products'] = preds.apply(lambda x: list(x.values), axis=1)
preds = preds['added_products']
test_data = test.copy()
target_cols = base_model.target_cols
test_data.reset_index(inplace=True)
test_data['added_products'] = preds
test_data['truth_list'] = test_data[target_cols].apply(lambda x: list(compress(target_cols, x.values)), axis=1)
test_data['apk'] = test_data.apply(lambda x: base_model.apk(x['truth_list'], x['added_products']),axis=1)
print(f"mean average precision single node south = {test_data['apk'].mean()}")

mean average precision single node south = 0.02279485233961093


In [None]:
data

{'product_dict': {'ind_cco_fin_ult1': 0,
  'ind_cder_fin_ult1': 1,
  'ind_cno_fin_ult1': 2,
  'ind_ctju_fin_ult1': 3,
  'ind_ctma_fin_ult1': 4,
  'ind_ctop_fin_ult1': 5,
  'ind_ctpp_fin_ult1': 6,
  'ind_deco_fin_ult1': 7,
  'ind_dela_fin_ult1': 8,
  'ind_deme_fin_ult1': 9,
  'ind_ecue_fin_ult1': 10,
  'ind_fond_fin_ult1': 11,
  'ind_hip_fin_ult1': 12,
  'ind_nom_pens_ult1': 13,
  'ind_nomina_ult1': 14,
  'ind_plan_fin_ult1': 15,
  'ind_pres_fin_ult1': 16,
  'ind_reca_fin_ult1': 17,
  'ind_recibo_ult1': 18,
  'ind_tjcr_fin_ult1': 19,
  'ind_valo_fin_ult1': 20,
  'ind_viv_fin_ult1': 21},
 'product_reverse_dict': {0: 'ind_cco_fin_ult1',
  1: 'ind_cder_fin_ult1',
  2: 'ind_cno_fin_ult1',
  3: 'ind_ctju_fin_ult1',
  4: 'ind_ctma_fin_ult1',
  5: 'ind_ctop_fin_ult1',
  6: 'ind_ctpp_fin_ult1',
  7: 'ind_deco_fin_ult1',
  8: 'ind_dela_fin_ult1',
  9: 'ind_deme_fin_ult1',
  10: 'ind_ecue_fin_ult1',
  11: 'ind_fond_fin_ult1',
  12: 'ind_hip_fin_ult1',
  13: 'ind_nom_pens_ult1',
  14: 'ind_nomina_

In [None]:
tst = data['test_south'].copy()
product_reverse_dict = data['product_reverse_dict']   
bst = lgb.Booster(model_file='data/final/data_lightgbm/model_distributed.txt')
preds = bst.predict(tst[base_model.feature_cols])
preds = np.argsort(preds, axis=1)
preds = np.fliplr(preds)[:,:7]
preds = pd.DataFrame(preds)
preds = preds.applymap(lambda x: product_reverse_dict[x])
preds['added_products'] = preds.apply(lambda x: list(x.values), axis=1)
preds = preds['added_products']
test_data = tst
target_cols = base_model.target_cols
test_data.reset_index(inplace=True)
test_data['added_products'] = preds
test_data['truth_list'] = test_data[target_cols].apply(lambda x: list(compress(target_cols, x.values)), axis=1)
test_data['apk'] = test_data.apply(lambda x: base_model.apk(x['truth_list'], x['added_products']),axis=1)
print(f"mean average precision test_south = {test_data['apk'].mean()}")

mean average precision test_south = 0.026443885421329187


In [None]:
tst = data['test_north'].copy()
product_reverse_dict = data['product_reverse_dict']   
bst = lgb.Booster(model_file='data/final/data_lightgbm/model_distributed.txt')
preds = bst.predict(tst[base_model.feature_cols])
preds = np.argsort(preds, axis=1)
preds = np.fliplr(preds)[:,:7]
preds = pd.DataFrame(preds)
preds = preds.applymap(lambda x: product_reverse_dict[x])
preds['added_products'] = preds.apply(lambda x: list(x.values), axis=1)
preds = preds['added_products']
test_data = tst
target_cols = base_model.target_cols
test_data.reset_index(inplace=True)
test_data['added_products'] = preds
test_data['truth_list'] = test_data[target_cols].apply(lambda x: list(compress(target_cols, x.values)), axis=1)
test_data['apk'] = test_data.apply(lambda x: base_model.apk(x['truth_list'], x['added_products']),axis=1)
print(f"mean average precision test_north = {test_data['apk'].mean()}")

mean average precision test_north = 0.017835593289980797


In [None]:
tst = data['test'].copy()
product_reverse_dict = data['product_reverse_dict']   
bst = lgb.Booster(model_file='data/final/data_lightgbm/model_distributed.txt')
preds = bst.predict(tst[base_model.feature_cols])
preds = np.argsort(preds, axis=1)
preds = np.fliplr(preds)[:,:7]
preds = pd.DataFrame(preds)
preds = preds.applymap(lambda x: product_reverse_dict[x])
preds['added_products'] = preds.apply(lambda x: list(x.values), axis=1)
preds = preds['added_products']
test_data = tst
target_cols = base_model.target_cols
test_data.reset_index(inplace=True)
test_data['added_products'] = preds
test_data['truth_list'] = test_data[target_cols].apply(lambda x: list(compress(target_cols, x.values)), axis=1)
test_data['apk'] = test_data.apply(lambda x: base_model.apk(x['truth_list'], x['added_products']),axis=1)
print(f"mean average precision test = {test_data['apk'].mean()}")

mean average precision test = 0.022793803380779395


In [None]:
data = get_two_region_data()
param = {
    'objective': 'multiclass',
    'num_class': 22,
    'eta': 0.05,
    'min_child_weight': 1,
    'subsample': 0.7,
    'colsample_bytree': 0.7,
    'max_depth': 8,
    'metric': 'multi_logloss',
    'verbosity': 2
}
num_round = 50
train_data = lgb.Dataset(data['train_X_north'][base_model.feature_cols], label=data['train_y_north'])
bst = lgb.train(param, train_data, num_round)

load data
prepare data


In [None]:
preds = bst.predict(data['test'][base_model.feature_cols])
preds = np.argsort(preds, axis=1)
preds = np.fliplr(preds)[:,:7]
preds = pd.DataFrame(preds)
preds = preds.applymap(lambda x: product_reverse_dict[x])
preds['added_products'] = preds.apply(lambda x: list(x.values), axis=1)
preds = preds['added_products']
test_data = test.copy()
target_cols = base_model.target_cols
test_data.reset_index(inplace=True)
test_data['added_products'] = preds
test_data['truth_list'] = test_data[target_cols].apply(lambda x: list(compress(target_cols, x.values)), axis=1)
test_data['apk'] = test_data.apply(lambda x: base_model.apk(x['truth_list'], x['added_products']),axis=1)
print(f"mean average precision south = {test_data['apk'].mean()}")

mean average precision south = 0.022730792975139427


In [None]:
data.keys()

dict_keys(['product_dict', 'product_reverse_dict', 'feature_cols', 'target_cols', 'train', 'test', 'train_south', 'train_north', 'train_X_south', 'train_y_south', 'train_X_north', 'train_y_north', 'test_south', 'test_north'])

## References 

- [Mengwei Yang et.al., The Tradeoff Between Privacy and Accuracy in Anomaly Detection Using Federated XGBoost (2019)](https://arxiv.org/abs/1907.07157)
- [Yang, Federated Machine Learning: Concept and Applications (2019)](https://arxiv.org/abs/1902.04885)
- https://mc2-xgboost.readthedocs.io/en/latest/tutorials/distributed.html
- [Tianqi Chen et.al, XGBoost (2016)](https://arxiv.org/pdf/1603.02754.pdf)
- [XGBoost Distributed Training and Parallel Predictions with Apache Spark](https://medium.com/cloudzone/xgboost-distributed-training-and-predicting-with-apache-spark-1127cdfb31ae)
- InPrivate Digging: Enabling Tree-based Distributed Data Mining with Differential Privacy