In [None]:
import os
import sys
import copy
import math
import torch
import random
import syft as sy
import numpy as np
import pandas as pd
import tensorflow as tf
import statsmodels.api as sm
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from collections import OrderedDict
sys.path.append(os.getcwd()+'/assistive_functions')

from SMWrapper import SMWrapper
from household import Household, SyNet
from load_data import get_data_of_a_person
from construct_dataset import construct_dataset
from sklearn.model_selection import train_test_split

random.seed(30)
np.random.seed(30)

In [1]:
import torch

In [4]:
test = torch.zeros((10,10))
test.shape(0)

TypeError: 'torch.Size' object is not callable

In [None]:
# candidate lags
def get_lags(step_ahead, num_days=np.array([0,1,7])):
    lags = []
    for i in num_days:
        lags = lags + [48*i, 48*i+1, 48*i+2]
    lags = [x-step_ahead+1 for x in lags if x>=step_ahead]
    return lags

def connect_to_households(household_options):
    # get candidate houses from the selected group
    path = os.getcwd()+"/input/informations_households.csv.xls"
    data = pd.read_csv(path)
    # filter by group
    candidates = data.loc[data.Acorn==household_options['group']]
    # filter by tariff 
    candidates = candidates.loc[candidates.stdorToU==household_options['stdorToU']]
    # print(candidates)    
    # TODO: shuffle
    households=[]
    step_ahead=1

    # create households
    needed = household_options['num_households']
    num = 0
    while needed>0:
        # check if there are enough households
        if num>=len(candidates):
            num_households = len(households)
            print('[Warning] could not find enough households')
            print('[Warning] changed number of households to ' + str(num_households))
        # get household
        household = Household(house_id=candidates.LCLid.iloc[num],
                               block_num=candidates.file.iloc[num])
        # load data with regression options
        household.construct_dataset(lags=get_lags(step_ahead), step_ahead=step_ahead, options=options)
        if len(household.y) > 0:
            households.append(household)
            needed = needed-1
        # search next
        num = num+1

    print('\n[INFO] Connected to ' + str(len(households)) + ' households')
    return households

In [None]:
# number of devices and their group
num_households = 4
group="ACORN-L"
stdorToU="ToU"
household_options = {"num_households":num_households,
                    "group":group,
                    "stdorToU":stdorToU}

# regression options
options = {"dayparts":[],
           "resolution":60,
           "remove_holiday":True,
           "filt_days":['Tuesday'], 
           "replacement_method":'week_before',
           "feat_cols":['hourofd_x', 'hourofd_y', 'dayofy_x', 'dayofy_y', 'temperature_hourly']}
step_ahead=1

households = connect_to_households(household_options)

# construct dataset
for household in households:
    household.construct_dataset(lags=get_lags(step_ahead), step_ahead=step_ahead, options=options)
    household.train_test_split(test_frac=0.25)

In [None]:
len(households)

In [None]:
import multiprocessing as mp

import time

In [None]:
def print_param(model, message=''):
    with np.printoptions(precision=3, suppress=True):
        print(message+' model: \nbias = ' + 
              str(model.state_dict()['linear.bias'].numpy()[0]) +
              ', \nweights = ' + 
              str(model.state_dict()['linear.weight'].numpy().flatten()))
    return
        
def compute_delta(household, args, output):
    # run minibatch SGD and get update in parameters
    model = args[0]
    cur_state_dict = args[1]
    optim = args[2]
    mbsize = args[3]
    tot_samp = args[4]
    
    
    db, dw = household.minibatch_SGD(model=model,optim=optim, mbsize=mbsize, verbose=False)
    # aggregate updates
    delta_bias  = db*household.info['train_samples']/tot_samp
    delta_weight= dw*household.info['train_samples']/tot_samp
    print(delta_weight)
    output.put((delta_weight, delta_bias))            

def _read(q):
    while True:
        delta_weight, delta_bais = q.get(True)
        
def distributed_train_parallel(households, lr, mbsize, total_it, optim_method, **kwargs):
    '''
    optim_method: Adam or SGD
    '''
    random.seed(30)
    np.random.seed(30)
    torch.manual_seed(30)
    # find total samples
    tot_samp = 0
    for household in households:
        tot_samp = tot_samp + household.info['train_samples']
    # initialize model
    if 'init_state_dict' in kwargs:
        set_init = True
        init_state_dict = kwargs.get('init_state_dict')
    else:
        set_init = False
    # create an initial model with random parameters
    in_dim=households[0].info['num_features']
    model = SyNet(torch, in_dim=in_dim, out_dim=1)
    if set_init:
        for key, value in init_state_dict.items():
            model.state_dict()[key].copy_(value)
        print_param(model, 'Initial DA ')
    # create optimizer
    if optim_method=='Adam':
        optim = torch.optim.Adam(params=model.parameters(), lr=lr)
    else:
        if optim_method=='SGD':
            optim = torch.optim.SGD(params=model.parameters(), momentum=0, lr=lr)
        else:
            print('Unsupported optimization method')
            return
        
    time_start = time.time()
    # iterate
    for i in np.arange(math.floor(total_it/mbsize)):
        # initialize param update
        cur_state_dict=copy.deepcopy(model.state_dict())
        cur_weight = model.state_dict()['linear.weight'].numpy().flatten()
        cur_bias = model.state_dict()['linear.bias'].numpy()
        delta_bias = np.zeros(1)
        delta_weight = np.zeros(in_dim)
        # run minibatch SGD for each household
        arguments = [model, cur_state_dict, optim,mbsize, tot_samp]
        results = mp.Queue(0)
        pool = mp.Pool(processes=len(households))
        for household in households:
                pool.apply_async(compute_delta, args = (household, arguments, results))
        
        pool.close()
        pool.join()
        
        #missing a step to collect the weight and bias in the results queue
        
        
        
        # update model
        new_bias = cur_bias + delta_bias/len(households)
        new_bias = torch.tensor(new_bias)
        new_weight = cur_weight + delta_weight/len(households)
        new_weight = torch.tensor(new_weight.reshape((1,len(new_weight))))
        model.state_dict()['linear.weight'].copy_(new_weight)
        model.state_dict()['linear.bias'].copy_(new_bias)
        #print_param(model, 'Iteration ' + str(i) + ' ')

        #print(' ')
        
    time_end = time.time()
    print('elapsed time: ',time_end-time_start)
    # print trained parameters
    print_param(model, 'Trained DA ')
    return model

In [None]:
def print_param(model, message=''):
    with np.printoptions(precision=3, suppress=True):
        print(message+' model: \nbias = ' + 
              str(model.state_dict()['linear.bias'].numpy()[0]) +
              ', \nweights = ' + 
              str(model.state_dict()['linear.weight'].numpy().flatten()))
    return
        
        
def distributed_train(households, lr, mbsize, total_it, optim_method, **kwargs):
    '''
    optim_method: Adam or SGD
    '''
    random.seed(30)
    np.random.seed(30)
    torch.manual_seed(30)
    # find total samples
    tot_samp = 0
    for household in households:
        tot_samp = tot_samp + household.info['train_samples']
    # initialize model
    if 'init_state_dict' in kwargs:
        set_init = True
        init_state_dict = kwargs.get('init_state_dict')
    else:
        set_init = False
    # create an initial model with random parameters
    in_dim=households[0].info['num_features']
    model = SyNet(torch, in_dim=in_dim, out_dim=1)
    if set_init:
        for key, value in init_state_dict.items():
            model.state_dict()[key].copy_(value)
        print_param(model, 'Initial DA ')
    # create optimizer
    if optim_method=='Adam':
        optim = torch.optim.Adam(params=model.parameters(), lr=lr)
    else:
        if optim_method=='SGD':
            optim = torch.optim.SGD(params=model.parameters(), momentum=0, lr=lr)
        else:
            print('Unsupported optimization method')
            return
        
    time_start = time.time()
    # iterate
    for i in np.arange(math.floor(total_it/mbsize)):
        # initialize param update
        cur_state_dict=copy.deepcopy(model.state_dict())
        cur_weight = model.state_dict()['linear.weight'].numpy().flatten()
        cur_bias = model.state_dict()['linear.bias'].numpy()
        delta_bias = np.zeros(1)
        delta_weight = np.zeros(in_dim)
        # run minibatch SGD for each household
        for household in households:
            # run minibatch SGD and get update in parameters
            db, dw = household.minibatch_SGD(model=model,optim=optim, mbsize=mbsize, verbose=False)
            # aggregate updates
            delta_bias  = db*household.info['train_samples']/tot_samp + delta_bias
            delta_weight= dw*household.info['train_samples']/tot_samp + delta_weight
            # reset model
            for key, value in cur_state_dict.items():
                model.state_dict()[key].copy_(value)
        # update model
        print(delta_weight)
        new_bias = cur_bias + delta_bias/len(households)
        new_bias = torch.tensor(new_bias)
        new_weight = cur_weight + delta_weight/len(households)
        new_weight = torch.tensor(new_weight.reshape((1,len(new_weight))))
        model.state_dict()['linear.weight'].copy_(new_weight)
        model.state_dict()['linear.bias'].copy_(new_bias)
        #print_param(model, 'Iteration ' + str(i) + ' ')

        #print(' ')
    # print trained parameters
    time_end = time.time()
    print('elapsed time: ',time_end-time_start)
    print_param(model, 'Trained DA ')
    return model

In [None]:
lr = 0.1
mbsize = 2
total_it = 400

# create an initial model with random parameters
model = SyNet(torch, in_dim=household.info['num_features'], out_dim=1)
init_state_dict = copy.deepcopy(model.state_dict())
  
# set training data
households[0].train_test_split(test_frac=0.25)    

# METHOD 1
household = households[0]
print('\nMethod 1')
household.fit_personal_model(method='Adam', lr=lr, iterations=total_it, init_state_dict=init_state_dict)
model_lr = household.personal_lr
print_param(model_lr, 'personal')

# METHOD 2
print('\nMethod 2')
model_da = distributed_train_parallel([household], optim_method='Adam', 
                             lr=lr, mbsize=mbsize, total_it=total_it, 
                             init_state_dict=init_state_dict)

# Compare
tol = 1e-3
dif_b = np.abs(model_da.state_dict()['linear.bias'].numpy()-model_lr.state_dict()['linear.bias'].numpy())
dif_w = np.abs(model_da.state_dict()['linear.weight'].numpy().flatten()-model_lr.state_dict()['linear.weight'].numpy().flatten())
if np.all(dif_b<tol) and np.all(dif_w<tol):
    print('\nSingle household check was successful.')
else:
    print('\nSingle household check failed.')

In [None]:
lr = 0.1
mbsize = 2
total_it = 400

# create an initial model with random parameters
model = SyNet(torch, in_dim=household.info['num_features'], out_dim=1)
init_state_dict = copy.deepcopy(model.state_dict())
  
# set training data
households[0].train_test_split(test_frac=0.25)    

In [None]:
lr = 0.1
mbsize = 2
total_it = 400

# create an initial model with random parameters
model = SyNet(torch, in_dim=household.info['num_features'], out_dim=1)