## Federated Learning Scheduler

The following considers scheduling of training in a federated learning environment. As processing power, upload speed, download speed, and data size varies greatly between different mobile devices, this is an important topic.

There are two problems we're trying to address here. The first is, given a bunch of models we can train, if we are resource constrained and can only train some, which should we train first? The second it given a model we've decided to train, how do we decide how many devices and/or which devices it should train on.

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from networkx.drawing.nx_agraph import graphviz_layout
from networkx.drawing import nx_pydot
from graphviz import Source
import math
from ortools.linear_solver import pywraplp
from ortools.sat.python import cp_model
import torchvision.models as models
import torch
from ptflops import get_model_complexity_info
from torchsummary import summary
from torchvision import models
from pytorch_modelsize import SizeEstimator
import torch.nn as nn
from torch.autograd import Variable
import numpy as np
from scipy.sparse.csgraph import maximum_bipartite_matching
from scipy.sparse import csr_matrix
from ortools.sat.python import cp_model

### Which models when?

Given a bunch of models we can train in parallel, if we are resource constrained and can only train some, which should we train first?

Our goal here is to maximize our profits for some profit per unit time equation $z$ that's a function of how much we sold a dataset for, $p$, and how long we anticipate it will take to train; $t$. In other words, we're seeking the selection models that maximize our total profit per unit time, $P/t$, while satisfying our resource constraints based on set $M$ models and $X$ devices. As the cost of training itself is negligable (as compute is essentially free since it's on mobile device), this makes more sense than maximizing gross profit

More succinctly for some compute cost per unit time $c$ <br />
$z = \displaystyle\sum_{i = 0}^{\bar{M}}(\frac{p_{i}}{t_{i}})m_{i}$ <br />
For all $i \in M$; $m_{i}$ is a decision variable set to 0 or 1 <br />
With constraints: <br />
$\displaystyle\sum_{i = 0}^{\bar{M}}m_{i} \leq \bar{M}$ <br />
$\displaystyle\sum_{i = 0}^{\bar{M}}m_{i} \leq \bar{X}$ <br />
$\forall m_{i} \in M, m_{i} \in \{0,1\}$

We'll assume $\forall i  \mid  0 \leq i \leq \bar{M}$ $p_i$ and $t_i$ are known. This is a reasonable hypothesis, as on deployment, we will trivially know $p_i$ and we can estimate $t_i$ based on model type and number of parameters, which are trivially known. We'll also assume $\bar{M}$ and $\bar{X}$ are trivially known. So let's run through an example, and then generalize our solution into an algorithm

In [2]:
solver = pywraplp.Solver.CreateSolver('SCIP')

In [4]:
#load in CSV with info on models
df = pd.read_csv('data/or_models_example.csv')
df

Unnamed: 0,models,p,t
0,m1,100,20
1,m2,300,50
2,m3,200,10
3,m4,30,60


In [5]:
m1 = solver.IntVar(0.0, 1.0, 'm1')
m2 = solver.IntVar(0.0, 1.0, 'm2')
m3 = solver.IntVar(0.0, 1.0, 'm3')
m4 = solver.IntVar(0.0, 1.0, 'm4')

In [6]:
print('Number of variables =', solver.NumVariables())

Number of variables = 4


In [7]:
X = 3

In [8]:
#build our constraints and input into google OR
solver.Add(m1 + m2 + m3 + m4 <= 4)
solver.Add(m1 + m2 + m3 + m4 <= 3)
print('Number of constraints =', solver.NumConstraints())

Number of constraints = 2


In [9]:
#build our equation
solver.Maximize(5*m1 + (300/50)*m2 + 20*m3 + (30/60)*m4)

In [10]:
status = solver.Solve()

In [11]:
if status == pywraplp.Solver.OPTIMAL:
    print('Solution:')
    print('Objective value =', solver.Objective().Value())
    print('m1 =', m1.solution_value())
    print('m2 =', m2.solution_value())
    print('m3 =', m3.solution_value())
    print('m4 =', m4.solution_value())
else:
    print('The problem does not have an optimal solution.')

Solution:
Objective value = 31.0
m1 = 1.0
m2 = 1.0
m3 = 1.0
m4 = 0.0


This solution should makes sense. It should be trivial in fact, all we've done is ranked the models with the highest price to time ratio. Let's add a layer of complexity here though. Each model has characteristic batch size, which is the number of mobile devices the model can train on in parallel. This means we must change our second constraint based on a set of batch sizes $B$ for models in $M$  from: <br />
$\displaystyle\sum_{i = 0}^{\bar{M}}m_{i} \leq \bar{X}$  to <br />
$\displaystyle\sum_{i = 0}^{\bar{M}}b_i*m_{i} \leq \bar{X}$; where $b_i \in B$ and is batch size for model $m_i$



Let's run our example again but now including batch size and using our new constraint

In [12]:
#load in CSV with info on models
df = pd.read_csv('data/or_models_example_batch.csv')
df

Unnamed: 0,models,p,t,batch
0,m1,100,20,4
1,m2,300,50,6
2,m3,200,10,10
3,m4,30,60,2


In [13]:
solver = pywraplp.Solver.CreateSolver('SCIP')
m1 = solver.IntVar(0.0, 1.0, 'm1')
m2 = solver.IntVar(0.0, 1.0, 'm2')
m3 = solver.IntVar(0.0, 1.0, 'm3')
m4 = solver.IntVar(0.0, 1.0, 'm4')
print('Number of variables =', solver.NumVariables())

Number of variables = 4


In [14]:
#build our constraints and input into google OR
solver.Add(m1 + m2 + m3 + m4 <= 4)
solver.Add(4*m1 + 6*m2 + 10*m3 + 2*m4 <= 3)
print('Number of constraints =', solver.NumConstraints())

Number of constraints = 2


In [15]:
#build our equation
solver.Maximize(5*m1 + (300/50)*m2 + 20*m3 + (30/60)*m4)
status = solver.Solve()

In [16]:
if status == pywraplp.Solver.OPTIMAL:
    print('Solution:')
    print('Objective value =', solver.Objective().Value())
    print('m1 =', m1.solution_value())
    print('m2 =', m2.solution_value())
    print('m3 =', m3.solution_value())
    print('m4 =', m4.solution_value())
else:
    print('The problem does not have an optimal solution.')

Solution:
Objective value = 0.5
m1 = 0.0
m2 = 0.0
m3 = 0.0
m4 = 1.0


Each model requires a different number of cycles to train. Every cycle we need to make this decision. So, now that we've established a realistic objective function and constraints, let's turn this into an algorithm that can be used on deployment to queue up models each cycle.

In [17]:
#load in CSV with cycle and batch info
df = pd.read_csv('data/or_models_example_cycle.csv')
df

Unnamed: 0,models,p,t,batch,cycle
0,m1,100,20,4,2
1,m2,300,50,6,3
2,m3,200,10,10,5
3,m4,30,60,2,4


In [62]:
#this will perform our optimization given the dataframe above
def find_next(df, m_bar, x_bar):
    eq = 0
    co1 = 0
    co2 = 0
    solver = pywraplp.Solver.CreateSolver('SCIP')
    #instantiate binary decision variables for each model
    i = 0
    model_solve = []
    solution = {}
    # for each model
    for model in df['models']:
        model = solver.IntVar(0.0, 1.0, model)
        model_solve.append(model)
        #grab price, time, and batch_size from the data frame
        p = df.loc[i,'p']
        t = df.loc[i,'t']
        b = df.loc[i,'batch']
        #construct objective function summed over each model
        eq = eq + (p/t) * model
        #construct both constraints
        co1 = co1 + model
        co2 = co2 + b*model
        i = i + 1
    #build our constraints and input into google OR
    solver.Add(co1 <= m_bar)
    solver.Add(co2 <= x_bar)
    solver.Maximize(eq)
    status = solver.Solve()
    i = 0
    #put solutions and max objective value into dictionary, return
    for mod in model_solve:
        solution[df['models'][i]] = mod.solution_value()
        i = i + 1
    solution['Z'] = solver.Objective().Value()
    return solution

In [63]:
#heres an example call for one cycle
find_next(df, 4, 10)

{'m1': -0.0, 'm2': -0.0, 'm3': 1.0, 'm4': -0.0, 'Z': 20.0}

In [20]:
solutions = []
def queue_cycle(df, m_bar, x_bar):
    #find next models for this cycle
    print(df)
    solution = find_next(df, m_bar, x_bar)
    solutions.append(solution)
    #clean up dataframe by decrementing num cycles
    i = 0
    for model in solution.keys():
        if(solution[model] == 1):
            df.loc[i, 'cycle'] = df.loc[i, 'cycle'] - 1
        i = i + 1
    #if any model has 0 num cycles remaining, remove
    df = df[df['cycle'] > 0]
    df = df.reset_index(drop=True)
    m_bar = df.shape[0]
    #exit condition
    if df.empty:
        return
    else:
        #recurse
        queue_cycle(df, m_bar, x_bar)

In [21]:
# lets try this out with the same sample dataset
queue_cycle(df, 4, 10)

  models    p   t  batch  cycle
0     m1  100  20      4      2
1     m2  300  50      6      3
2     m3  200  10     10      5
3     m4   30  60      2      4
  models    p   t  batch  cycle
0     m1  100  20      4      2
1     m2  300  50      6      3
2     m3  200  10     10      4
3     m4   30  60      2      4
  models    p   t  batch  cycle
0     m1  100  20      4      2
1     m2  300  50      6      3
2     m3  200  10     10      3
3     m4   30  60      2      4
  models    p   t  batch  cycle
0     m1  100  20      4      2
1     m2  300  50      6      3
2     m3  200  10     10      2
3     m4   30  60      2      4
  models    p   t  batch  cycle
0     m1  100  20      4      2
1     m2  300  50      6      3
2     m3  200  10     10      1
3     m4   30  60      2      4
  models    p   t  batch  cycle
0     m1  100  20      4      2
1     m2  300  50      6      3
2     m4   30  60      2      4
  models    p   t  batch  cycle
0     m1  100  20      4      1
1     m2

In [22]:
solutions

[{'m1': -0.0, 'm2': -0.0, 'm3': 1.0, 'm4': -0.0, 'Z': 20.0},
 {'m1': -0.0, 'm2': -0.0, 'm3': 1.0, 'm4': -0.0, 'Z': 20.0},
 {'m1': -0.0, 'm2': -0.0, 'm3': 1.0, 'm4': -0.0, 'Z': 20.0},
 {'m1': -0.0, 'm2': -0.0, 'm3': 1.0, 'm4': -0.0, 'Z': 20.0},
 {'m1': -0.0, 'm2': -0.0, 'm3': 1.0, 'm4': -0.0, 'Z': 20.0},
 {'m1': 1.0, 'm2': 1.0, 'm4': 0.0, 'Z': 11.0},
 {'m1': 1.0, 'm2': 1.0, 'm4': 0.0, 'Z': 11.0},
 {'m2': 1.0, 'm4': 1.0, 'Z': 6.5},
 {'m4': 1.0, 'Z': 0.5},
 {'m4': 1.0, 'Z': 0.5},
 {'m4': 1.0, 'Z': 0.5}]

### Train Where?

On to our next problem. Now that we know what models to train, how can we decide where to train them?

###### We know in each cycle, a set of $n \in N$ mobile devices run $r \in R$ distributed training tasks. We also know the cost of training is time as variable compute is essentially free (happens on mobile device) and that time has three components: <br />
Computation time, upload time, and download time. <br />
Computation time: $T_{n}^{c}(D_r)$; where $D_r$ is the local data for mobile device <br />
Upload time: $T_{n}^{u}(M)$; where $M$ is the model being trained <br />
Download time: $T_{n}^{d}(M)$; where $M$ is the model being trained <br />

Lets define our objective function: <br />

$\displaystyle\min_{Y \in \alpha} \max_{n \in N} (T_{n}^{c}(D_r)+ T_{n}^{u}(M) + T_{n}^{d}(M))y_{nr}$ <br />

where $y_{nr} \in Y$, the assignment matrix, and $\alpha$ is the set of all permutations of the assignment matrix. <br />

Now let's define our constraints. 1. The sum of all local data on mobile devices should equal D. 2. Each task is only assigned to one user. 3. Each user is only assigned to one task. <br />

$\displaystyle\sum_{r \in R} D_r = D$ <br />
$\displaystyle\sum_{n \in N} y_{nr} = 1, r \in R$ <br />
$\displaystyle\sum_{r \in R} y_{nr} = 1, n \in N$ <br />




The naive solution here would be to divide up D into all possible combinations, and treat this as a linear bottleneck assignment problem. Let's take a stab at the naive optimization bellow.

If we take our data across devices to be IID (independant and identically distributed), we can treat federated scheduling as a variation of the linear bottleneck assignment problem. Notably, there are a number of agents (mobile devices) and tasks (models to train). Any agent can perform any task for some cost that varies based on agent/task assignment. We want to minimize the maximum cost of all agent/task assignments.

In [2]:
class Model(nn.Module):

    def __init__(self):
        super(Model,self).__init__()

        self.conv0 = nn.Conv2d(1, 16, kernel_size=3, padding=5)
        self.conv1 = nn.Conv2d(16, 32, kernel_size=3)

    def forward(self, x):
        h = self.conv0(x)
        h = self.conv1(h)
        return h

model = Model()

se = SizeEstimator(model, input_size=(16,1,256,256))
se.estimate_size()[0]

  input_ = Variable(torch.FloatTensor(*self.input_size), volatile=True)


408.283935546875

In [3]:
# function to find all of our permutations
def findpermutations(n,k):    
    if k < 0 or n < 0: return "Error"
    if not k: return [[0]*n]
    if not n: return []
    if n == 1: return [[k]]
    return [[0]+val for val in findpermutations(n-1,k)] + \
        [[val[0]+1]+val[1:] for val in findpermutations(n,k-1)]

In [4]:
df = pd.read_csv('data/or_models_example_cycle.csv')
df

Unnamed: 0,models,p,t,batch,cycle
0,m1,100,20,4,2
1,m2,300,50,6,3
2,m3,200,10,10,5
3,m4,30,60,2,4


In [5]:
def LBAP(cost_matrix):
    i = 0
    eq = 0
    label_solve = []
    con2 = 0
    obj = 0
    solution = {}
    time = {}
    mod_con = []
    #declare google OR model
    model = cp_model.CpModel()
    #for each device
    for dev in cost_matrix:
        label = []
        y = 0
        con = 0
        # for each model cost for a given device
        for mod in dev:
            label = 'dev' + str(i) + 'm' + str(y)
            time[label] = mod
            label = model.NewBoolVar(label)
            label_solve.append(label)
            #construct objective function
            eq = eq + int(mod) * label
            #construct constraints
            con = con + label
            con2 = con2 + label
            if i == 0: 
                mod_con.append(label)
            #construct constraints (each model must be trained)
            else:
                mod_con[y] = mod_con[y] + label
            y = y + 1
        model.Add(con <= 1)
        i = i + 1
    model.Add(con2 == y)
    for cons in mod_con:
        model.Add(cons == 1)
    model.Minimize(eq)
    solver = cp_model.CpSolver()
    status = solver.Solve(model)
    if status == cp_model.OPTIMAL:
        maxi = 0
        for label in label_solve:
            solution[label] = solver.Value(label)
            if (solver.Value(label) == 1):
                value = time[str(label)]
                if value > maxi:
                    maxi = value
        obj = obj + solver.ObjectiveValue()
        solution['Z'] = obj
        solution['max'] = maxi
    return solution

In [13]:
def assign_models(D, N_speed, models, df, input_shapes):
    # generate all possible permutations of D over R distributed
    # training tasks
    perms = findpermutations(len(N_speed), D)
    min_max = 1000000000
    # for each potential permutation
    print(len(perms))
    for perm in perms:
        cost_matrix = []
        y = 0
        for speed in N_speed:
            cost_model = []
            i = 0
            for model in models:
                #generate our costs for user n
                se = SizeEstimator(model, input_size=input_shapes[i])
                #estimated size of trained model in mb
                size = se.estimate_size()[0]
                #calculate upload and download speed
                #mb/s * 1/#mb = 1/seconds
                download = 1/(speed[0]*(1/size))
                upload = 1/(speed[1]*(1/size))
                computation = df.loc[i, 't']/(df.loc[i, 'batch']*df.loc[i, 'cycle'])
                adj = (perm[y])/D
                cost = upload + download + ((computation)*adj)
                #make row for this mobile device for all models
                cost_model.append(cost)
                i = i + 1
            #append mobile device row to cost matrix
            cost_matrix.append(cost_model)
            y = y + 1
        # perform LBAP for each data partition permutation
        solution = LBAP(cost_matrix)
        #keep track of minimum maximum
        if solution['max'] < min_max:
            min_max = solution['max']
            final_solution = solution
    return final_solution

In [14]:
#run an example
sol = assign_models(D=13, N_speed=[(10,5), (4,6), (3,2)], models=[model,model], df=df,input_shapes=[(16,1,256,256), (16,1,256,256)])

105


In [15]:
sol

{dev0m0(0..1): 1,
 dev0m1(0..1): 0,
 dev1m0(0..1): 0,
 dev1m1(0..1): 1,
 dev2m0(0..1): 0,
 dev2m1(0..1): 0,
 'Z': 292.0,
 'max': 170.11830647786456}