## Goal

In [15]:
# #option 1 : group of pipelines with 3 levels
# ML_pipelines_A = make_agent_pipelines([PCA(), KNN()],
#                                 [StandardScaler(),RobustScaler()],
#                                 [LinearRegression(),ANN()], parameters)

# #option 2 : multi pipelines of single level
# ML_pipelines_B = make_agent_pipelines([CNN(),BCNN(),ANN()], parameters)

#example of parameters for pipelines
# parameters = ([{"n_components":[1,2,3,4,5]}, {"n":[4]}],
#               [],
#               [0,{"dimensions":[120,233,345,666]}])
# aggregated_results = run_experiment([data_agent_1, data_agent_2], [ML_pipelines_A,ML_pipelines_B,ML_pipeline_C])


In [16]:
import os
path = "F:/PhD Research/Github/develop_ml_experiments_met4fof/agentMET4FOF"
os.chdir(path)

from agentMET4FOF.agents import AgentMET4FOF, AgentNetwork, MonitorAgent

from sklearn.model_selection import KFold
from sklearn import datasets
from sklearn.metrics import f1_score

from sklearn.neighbors import KNeighborsClassifier as KNN
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import ParameterGrid
from pprint import pprint
import copy

In [17]:
class ANN():
    def transform(self):
        return 12333

In [18]:
method_pca = PCA()
method_ann = ANN()

print(str(type(method_pca.fit)))
print(str(type(method_ann.transform)))

<class 'method'>
<class 'method'>


In [19]:
class TransformerAgent(AgentMET4FOF):
    def init_parameters(self,method=None, **kwargs):
        self.method = method
        self.models = {}
        #for single functions passed to the method
        for key in kwargs.keys():
            self.set_attr(key,kwargs[key])
    def on_received_message(self,message):
        #update chain
        chain = self.get_chain(message)

        #if data is dict, with 'x' in keys
        if type(message['data']) == dict and 'x' in message['data'].keys():
            X= message['data']['x']
            Y= message['data']['y']
        else:
            X=message['data']
            Y=None

        #if it is train, and has fit function, then fit it first.
        if message['channel'] == 'train':
            if hasattr(self.method, 'fit'):
                self.models.update({chain:copy.deepcopy(self.method).fit(X,Y)})
                if hasattr(self.method,'predict'):
                    return 0

        #proceed in transforming or predicting
        if hasattr(self.method, 'transform'):
            if chain in self.models.keys():
                results = self.models[chain].transform(X)
            else:
                results = self.method.transform(X)
        elif hasattr(self.method, 'predict'):
            if chain in self.models.keys():
                results = self.models[chain].predict(X)
            else:
                results = self.method.predict(X)
        else: #it is a plain function
            results = self.method(X)

        #send out
        #if it is a base model, don't send out the predicted train results
        chain= chain+"->"+self.name
        if hasattr(self.method, 'predict'):
            self.send_output({'x':X, 'y_true':Y, 'y_pred':results,'chain':chain},channel=message['channel'])
        else:
            self.send_output({'x':results, 'y':Y,'chain':chain},channel=message['channel'])

            
    def get_chain(self,message):
        chain =""
        if type(message['data']) == dict and 'chain' in message['data'].keys():
            chain = message['data']['chain']
        else:
            chain = message['from']
        return chain

In [20]:
class EvaluationAgent(AgentMET4FOF):
    def init_parameters(self,method=None, **kwargs):
        self.method = method
        self.eval_params = kwargs

    def on_received_message(self,message):
        #only evaluate if it is not train channel
        if message['channel'] != 'train':
            results = self.method(message['data']['y_true'],message['data']['y_pred'], **self.eval_params)
            if type(message['data']) == dict and 'chain' in message['data'].keys():
                agent_string = message['data']['chain']
                self.send_output({agent_string:results})
            else:
                self.send_output({self.method.__name__:results})


In [21]:
class DataStreamAgent(AgentMET4FOF):
    def init_parameters(self, data=None):
        self.data = data
        self.kf = KFold(n_splits=5,shuffle=True)
        
    def agent_loop(self):
        if self.current_state == "Running":
            for train_index, test_index in self.kf.split(self.data.data):
                x_train, x_test = self.data.data[train_index], self.data.data[test_index]
                y_train, y_test = self.data.target[train_index], self.data.target[test_index]
                self.send_output({'x':x_train,'y':y_train},channel="train")
                self.send_output({'x':x_test,'y':y_test},channel="test")
            self.current_state = "Stop"
            

In [22]:
class AgentPipeline:
    def __init__(self, agentNetwork=None,*argv, hyperparameters=None):
        
        # list of dicts where each dict is of (key:list of hyperparams)
        # each hyperparam in the dict will be spawned as an agent
        agentNetwork = agentNetwork
        self.hyperparameters = hyperparameters
        self.pipeline = self.make_agent_pipelines(agentNetwork, argv,hyperparameters)
        
    def make_transform_agent(self,agentNetwork, pipeline_component=None, hyperparameters={}):
        if ("function" in type(pipeline_component).__name__) or ("method" in type(pipeline_component).__name__):
            transform_agent = agentNetwork.add_agent(pipeline_component.__name__+"_Agent",agentType=TransformerAgent)
            transform_agent.init_parameters(pipeline_component,**hyperparameters)
        elif issubclass(type(pipeline_component), AgentMET4FOF):
            transform_agent = pipeline_component
            transform_agent.init_parameters(**hyperparameters)
        else: #class objects with fit and transform
            transform_agent = agentNetwork.add_agent(pipeline_component.__name__+"_Agent",agentType=TransformerAgent)
            transform_agent.init_parameters(pipeline_component(**hyperparameters))
        return transform_agent

    def make_agent_pipelines(self,agentNetwork=None, argv=[], hyperparameters=None):
        if agentNetwork is None:
            print("You need to pass an agent network as parameter to add agents")
            return -1
        agent_pipeline = []
        
        if hyperparameters is not None and len(hyperparameters) == 1:
            if type(hyperparameters[0]) != list: 
                hyperparameters = [hyperparameters]
            
        for pipeline_level, pipeline_component in enumerate(argv):
            agent_pipeline.append([])
            #create an agent for every unique hyperparameter combination
            if hyperparameters is not None:
                try:
                    if pipeline_level < len(hyperparameters):
                        hyper_param_level = hyperparameters[pipeline_level]
                    else:
                        hyper_param_level = {}
                except:
                    print("Error getting hyperparameters mapping")
                    return -1
                
                #now, hyper_param_level is a list of dictionaries of hyperparams for agents at pipeline_level   
                if type(pipeline_component) == list:
                    for function_id ,pipeline_function in enumerate(pipeline_component):
                        print(hyper_param_level)
                        if (type(hyper_param_level) == dict) or ((len(hyper_param_level)>0) and (len(hyper_param_level[function_id]) > 0)):
                            if type(hyper_param_level) == dict:
                                param_grid = list(ParameterGrid(hyper_param_level))
                            else:
                                param_grid = list(ParameterGrid(hyper_param_level[function_id]))
                            print("MAKING {} AGENTS WITH HYPERPARAMS: {}".format(pipeline_function.__name__, param_grid))
                            for param in param_grid:
                                transform_agent = self.make_transform_agent(agentNetwork,pipeline_function,param)
                                agent_pipeline[-1].append(transform_agent)
                            
                        else:
                            print("MAKING {} AGENT WITH DEFAULT HYPERPARAMS".format(pipeline_function.__name__))
                            #fill up the new empty list with a new agent for every pipeline function                        
                            transform_agent = self.make_transform_agent(agentNetwork,pipeline_function)
                            agent_pipeline[-1].append(transform_agent)
                        
                #non list, single function, class, or agent
                else:
                    #fill up the new empty list with a new agent for every pipeline function
                    transform_agent = self.make_transform_agent(agentNetwork,pipeline_component)
                    agent_pipeline[-1].append(transform_agent)
                    
                        
            #otherwise there's no hyperparameters usage, proceed with defaults for all agents  
            #the logic similar to before but without hyperparams loop
            else: 
                if type(pipeline_component) == list:
                    for pipeline_function in pipeline_component:
                        #fill up the new empty list with a new agent for every pipeline function
                        transform_agent = self.make_transform_agent(agentNetwork,pipeline_function)
                        agent_pipeline[-1].append(transform_agent)
                #non list, single function, class, or agent
                else:
                    #fill up the new empty list with a new agent for every pipeline function
                    transform_agent = self.make_transform_agent(agentNetwork,pipeline_component)
                    agent_pipeline[-1].append(transform_agent)

        #now bind the agents on one level to the next levels, for every pipeline level
        for pipeline_level, _ in enumerate(agent_pipeline):
            if pipeline_level != (len(agent_pipeline)-1):
                for agent in agent_pipeline[pipeline_level]:
                    for agent_next in agent_pipeline[pipeline_level+1]:
                        agent.bind_output(agent_next)
        return agent_pipeline

    def bind_output(self, output_agent):
        pipeline_last_level = self.pipeline[-1]
        if "AgentPipeline" in str(type(output_agent).__name__):
            for agent in pipeline_last_level:
                for next_agent in output_agent.pipeline[0]:
                    agent.bind_output(next_agent)
        elif type(output_agent) == list:
            for agent in pipeline_last_level:
                for next_agent in output_agent:
                    agent.bind_output(next_agent)            
        else:
            for agent in pipeline_last_level:
                agent.bind_output(output_agent)

    def unbind_output(self, output_agent):
        pipeline_last_level = self.pipeline[-1]
        if "AgentPipeline" in str(type(output_agent).__name__):
            for agent in pipeline_last_level:
                for next_agent in output_agent.pipeline[0]:
                    agent.unbind_output(next_agent)
        elif type(output_agent) == list:
            for agent in pipeline_last_level:
                for next_agent in output_agent:
                    agent.unbind_output(next_agent)            
        else:
            for agent in pipeline_last_level:
                agent.unbind_output(output_agent)
                
    def agents(self):
        agent_names = []
        for level in self.pipeline:
            agent_names.append([])
            for agent in level:
                agent_names[-1].append(agent.get_attr('name'))
        return agent_names

In [23]:
agentNetwork= AgentNetwork()

In [25]:
hyperparameters = [[],
                  [{"n_components":[1,2,3]}],
                  ]


ML_Agent_pipelines_B = AgentPipeline(agentNetwork,
                                     [StandardScaler,RobustScaler], 
                                     [PCA],
                                     [LogisticRegression,SVC], 
                                     hyperparameters=hyperparameters)

[]
MAKING StandardScaler AGENT WITH DEFAULT HYPERPARAMS
[]
MAKING RobustScaler AGENT WITH DEFAULT HYPERPARAMS
[{'n_components': [1, 2, 3]}]
MAKING PCA AGENTS WITH HYPERPARAMS: [{'n_components': 1}, {'n_components': 2}, {'n_components': 3}]
{}
MAKING LogisticRegression AGENTS WITH HYPERPARAMS: [{}]
{}
MAKING SVC AGENTS WITH HYPERPARAMS: [{}]


In [26]:
datastream_agent = agentNetwork.add_agent(agentType=DataStreamAgent)

evaluation_agent = agentNetwork.add_agent(agentType=EvaluationAgent)

monitor_agent = agentNetwork.add_agent(agentType=MonitorAgent)

datastream_agent.init_parameters(datasets.load_iris())
evaluation_agent.init_parameters(f1_score,average='micro')
evaluation_agent.bind_output(monitor_agent)



ML_Agent_pipelines_B.bind_output(evaluation_agent)


datastream_agent.bind_output(ML_Agent_pipelines_B)

In [27]:
agentNetwork.set_running_state()

----------------------------------------------------
 EXCEPTION <class 'RuntimeError'>: Agent must be running to safely execute methods!
 Extended stacktrace follows (most recent call last)
----------------------------------------------------
File "f:\ProgramData\Anaconda3\lib\site-packages\osbrain\proxy.py", line 219, in Proxy._pyroInvoke
Source code:
    methodname, args, kwargs, flags, objectId)
----------------------------------------------------
File "f:\ProgramData\Anaconda3\lib\site-packages\osbrain\proxy.py", line 262, in Proxy._remote_call
Source code:
    flags=flags, objectId=objectId)
Local values:
    __class__ = <class 'osbrain.proxy.Proxy'>
    args = ()
    flags = 0
    kwargs = {'current_state': 'Running'}
    methodname = 'set_attr'
    objectId = None
    safe_args = ['set_attr']
    self = <osbrain.proxy.Proxy at 0x45fc761eb8; connected IPv4; for PYRONAME:LogisticRegression_Agent_1@127.0.0.1:3333>
        self._pyroUri = <Pyro4.core.URI at 0x45fc783f28; PYRONAME:Lo

Agent must be running to safely execute methods!
----------------------------------------------------
 EXCEPTION <class 'RuntimeError'>: Agent must be running to safely execute methods!
 Extended stacktrace follows (most recent call last)
----------------------------------------------------
File "f:\ProgramData\Anaconda3\lib\site-packages\osbrain\proxy.py", line 219, in Proxy._pyroInvoke
Source code:
    methodname, args, kwargs, flags, objectId)
----------------------------------------------------
File "f:\ProgramData\Anaconda3\lib\site-packages\osbrain\proxy.py", line 262, in Proxy._remote_call
Source code:
    flags=flags, objectId=objectId)
Local values:
    __class__ = <class 'osbrain.proxy.Proxy'>
    args = ()
    flags = 0
    kwargs = {'current_state': 'Running'}
    methodname = 'set_attr'
    objectId = None
    safe_args = ['set_attr']
    self = <osbrain.proxy.Proxy at 0x45fc7834a8; connected IPv4; for PYRONAME:SVC_Agent_1@127.0.0.1:3333>
        self._pyroUri = <Pyro4.cor

Agent must be running to safely execute methods!
SET STATE:   Running


In [28]:
# agentNetwork.shutdown()