In [29]:
import numpy
import tensorflow as tf
from tensorflow import keras
import random
import pandas as pd
import numpy as np

distance_definitions = {'near':.95,'medium':.7} # averaged output by perceptron
wheel_definitions = {'slow':.5,'medium':.8}
near_sensor = lambda x: 0 if (x >= distance_definitions['near']) else ( 1 if x >= distance_definitions['medium'] else 2)
measure_wheel = lambda x: 0 if (x < wheel_definitions['slow']) else ( 1 if x < wheel_definitions['medium'] else 0)
turn_away = lambda x: 0 if (near_sensor(x)==0) else 2
actions = {'left':0,'right':1,'forward':2}

#near, medium, far - 0,1,2
# To implement next - when torque is greater, make sensor data change
generated_points = 1000

def generate_sensor_data(evaluation):
    # randomly generate voltage value
    # perceptron tends to over-estimate how close something is to compensate
    # for any signal loss due to noise/angle
    # skew probability to produce smaller numbers more often, to balance out this trend
    sensor_data_front = [random.triangular(0,2.9,0) for x in range(generated_points)]
    sensor_data_front_target = [evaluation(x) for x in sensor_data_front]
    sensor_data_front = pd.DataFrame(zip(sensor_data_front,sensor_data_front_target))
    sensor_data_front.columns = ['x','y']
    return(sensor_data_front)


def generate_decisions(evaluation,turnaway):
    sensor_data_front = [random.triangular(0,2.9,0) for x in range(generated_points)]
    sensor_data_front_target = [evaluation(x) for x in sensor_data_front]
    sensor_data_decision = [turnaway(x) for x in sensor_data_front]
    sensor_data_front = pd.DataFrame(zip(sensor_data_front,sensor_data_decision,sensor_data_front_target))
    sensor_data_front.columns = ['x','decision','y']
    return(sensor_data_front)

sensor_front_data = generate_sensor_data(evaluation=near_sensor)
sensor_front_data_decision = generate_decisions(evaluation=near_sensor,turnaway=turn_away)

sensor_front_data_decision
# wheel_lf_data = generate_sensor_data(evaluation=measure_wheel)
# wheel_rf_data = generate_sensor_data(evaluation=measure_wheel)
# wheel_lb_data = generate_sensor_data(evaluation=measure_wheel)
# wheel_rb_data = generate_sensor_data(evaluation=measure_wheel)
# from keras.utils.np_utils import to_categorical

class NeuralNet():
    def __init__(self,x,y,output,name):
        self.X = x
        self.y = y
        self.output=output
        self.name=name
        model = keras.models.Sequential()
        model.add(keras.layers.Dense(20,activation='relu')) # A layer connected to all layers
        # model.add(keras.layers.Dense(20,activation='relu')) # A layer connected to all layers
        # model.add(keras.layers.Dense(20,activation='relu')) # A layer connected to all layers

        model.add(keras.layers.Dense(output,activation='softmax'))

        model.compile(
            loss=keras.losses.SparseCategoricalCrossentropy(),#'sparse_categorical_cross_entropy',
            optimizer=keras.optimizers.SGD(),
            metrics=['accuracy']
        )

        self.model=model

    def train(self,**args):
        pass

    def load_model(self,model):
        self.model = model

    def predict(self,test,**args):
        predictions=self.model.predict(test)
        return(predictions)

    def cat_predict(self,test,**args):
        predictions=self.model.predict_classes(test)
        return(predictions)

    def save(self,path):
        self.model.save(path,overwrite=True)

class InfraredNet(NeuralNet):
    def __init__(self,input,target,output,name):
        super().__init__(input,target,output,name)
        model = keras.models.Sequential()
        model.add(keras.layers.BatchNormalization())
        # model.add(keras.layers.Dense(20,activation='relu')) # A layer connected to all layers
        # model.add(keras.layers.Dense(20,activation='relu')) # A layer connected to all layers

        model.add(keras.layers.Dense(output,activation='softmax'))

        model.compile(
            loss=keras.losses.SparseCategoricalCrossentropy(),#'sparse_categorical_cross_entropy',
            optimizer=keras.optimizers.SGD(),
            metrics=['accuracy']
        )
        # input_sensor = keras.layers.Input(shape=input.shape[1], name=name)
        # hidden1 = keras.layers.Dense(30, activation="relu")(input_sensor)
        # output = keras.layers.Dense(len(set(self.y)), name="output")(hidden1)

        # self.model = keras.models.Model(inputs=[input_sensor], outputs=[output])
        # self.model.compile(loss="sparse_categorical_entropy",
        #               optimizer=keras.optimizers.RMSprop(),
        #               metrics=['accuracy'])
        self.model=model

    def train(self):
        self.model.fit(self.X,self.y,epochs=30,batch_size=32)

    def mini_train(self,epochs):
        self.model.fit(self.X,self.y,epochs=epochs)
    # def predict(self,test):
    #   self.model.predict(test)


class MainNet(NeuralNet):
    def __init__(self,input,target,output,name):
        super().__init__(input,target,output,name)
        model = keras.models.Sequential()
        model.add(keras.layers.Dense(20,activation='relu')) # A layer connected to all layers
        # model.add(keras.layers.Dense(20,activation='relu')) # A layer connected to all layers

        model.add(keras.layers.Dense(output,activation='softmax'))

        model.compile(
            loss=keras.losses.SparseCategoricalCrossentropy(),#'sparse_categorical_cross_entropy',
            optimizer=keras.optimizers.SGD(),
            metrics=['accuracy']
        )

        self.model=model

    def train(self,epochs=30):
        #print(self.X.shape,self.y.shape)
        print("x: ",self.X)
        print("y: ",self.y)
        self.model.fit(self.X,self.y,epochs=epochs,batch_size=32,shuffle=True)

    # def predict(self,test):
    #   self.model.predict(self,test)

class TreeNet():
    def __init__(self,sensors,mainnet=None):
        # assert(type(models[0]) == NeuralNet)
        self.sensors = sensors

        self.mainnet = mainnet


    # Train all models
    def train(self,data,y,train_sensors=True,epochs=50):
        all_predictions = []

        for model,sensor_data in zip(self.sensors,data):
            if(train_sensors==True):
                print("Training " + model.name)
                model.X=sensor_data[0]
                model.y=sensor_data[1]
                model.train()
            predictions = model.predict(sensor_data[0])
            all_predictions.append(predictions)
        # all_predictions
        training_data=np.concatenate((all_predictions),axis=1)
        print(training_data.shape)
        self.mainnet.X = training_data
        self.mainnet.y = y
        print("Training mainnet...")
        if epochs == 50:
            self.mainnet.train()
        else:
            self.mainnet.mini_train(epochs=epochs)
        # return(training_data)#training_data)


    def standardize_data(self,X):
        return([[x] for x in X])


    def standardize_data_all(self,X):
        all_data=[]
        for x in X:
            res=[]
            for sub_x in x:
                res.append([sub_x])
            all_data.append(res)
        return(all_data)

    # Use predictions of models to train main net
    def predict(self,X):
        all_predictions = []

        for model,sensor_data in zip(self.sensors,X):
            predictions = model.predict(sensor_data)
            all_predictions.append(predictions)

        # all_predictions
        test_data=np.concatenate((all_predictions),axis=1)
        # print(training_data)
        predictions=self.mainnet.predict(test_data)
        return(predictions)

    def predict_class(self,X):
        all_predictions = []

        for model,sensor_data in zip(self.sensors,X):

            predictions = model.predict(sensor_data)
            all_predictions.append(predictions)
        # all_predictions
        test_data=np.concatenate((all_predictions),axis=1)
        # print(test_data)
        # print(training_data)
        predictions=self.mainnet.cat_predict(test_data)
        return(predictions)


# defining training set for infrared nets --------------------------------------------------------------------
train_x = sensor_front_data_decision[['x']].values
train_y = sensor_front_data_decision[['y']].values
# train = []
# for i in range(7):
#     train.append((train_x, train_y))


# defining each sensor ---------------------------------------------------------------------------------------
infr_net = InfraredNet(input=train_x,target = train_y,output=3,name='infrared')
infr_net.train()

infrared_net_list = [infr_net,infr_net,infr_net,infr_net,infr_net,infr_net,infr_net]
main_net = MainNet(input=None,target=None,output=3,name="main_net")


# for net in infrared_net_list:
#     net.train()
# -------------------------------------------------------------------------------------------------------------
# set up tree net, train tree net with manually collected data
tree_net = TreeNet(infrared_net_list,mainnet=main_net)

# data reformatting -------------------------------------------------------------------------------------------
# problem: "[0,0,0,0,0,0,0]" keeps being interpreted as string instead of array
# either try to convert to right type or change store type
from ast import literal_eval
from data_process import get_float_array
from sklearn.model_selection import train_test_split

df = pd.read_csv("dataset.csv")
df['action'] = [int(i) for i in df['action']]

X_train, X_test, y_train, y_test = train_test_split(df[['s0','s1','s2','s3','s4','s5','s6']], df[['action']], test_size=0.30, random_state=42)
truth = y_train.values
truth_test = y_test.values

x0 = X_train[['s0']].values
x1 = X_train[['s1']].values
x2 = X_train[['s2']].values
x3 = X_train[['s3']].values
x4 = X_train[['s4']].values
x5 = X_train[['s5']].values
x6 = X_train[['s6']].values

y0 = [near_sensor(i) for i in x0]
y1 = [near_sensor(i) for i in x1]
y2 = [near_sensor(i) for i in x2]
y3 = [near_sensor(i) for i in x3]
y4 = [near_sensor(i) for i in x4]
y5 = [near_sensor(i) for i in x5]
y6 = [near_sensor(i) for i in x6]


train = [(x0,y0), (x1,y1), (x2,y2), (x3,y3), (x4,y4), (x5,y5), (x6,y6)]
tree_net.train(data=train, y=truth, train_sensors=False)

x0 = X_test[['s0']].values
x1 = X_test[['s1']].values
x2 = X_test[['s2']].values
x3 = X_test[['s3']].values
x4 = X_test[['s4']].values
x5 = X_test[['s5']].values
x6 = X_test[['s6']].values

y0 = [near_sensor(i) for i in x0]
y1 = [near_sensor(i) for i in x1]
y2 = [near_sensor(i) for i in x2]
y3 = [near_sensor(i) for i in x3]
y4 = [near_sensor(i) for i in x4]
y5 = [near_sensor(i) for i in x5]
y6 = [near_sensor(i) for i in x6]
test = [x0,x1,x2,x3,x4,x5,x6]


Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
(1468, 21)
Training mainnet...
x:  [[9.6889919e-01 2.3421163e-02 7.6796045e-03 ... 3.7024252e-04
  1.5705751e-02 9.8392397e-01]
 [9.9644780e-01 3.0627560e-03 4.8939226e-04 ... 3.7024252e-04
  1.5705751e-02 9.8392397e-01]
 [9.6546483e-01 2.5789259e-02 8.7460121e-03 ... 9.9483973e-01
  4.3670083e-03 7.9323776e-04]
 ...
 [9.9447638e-01 4.6576397e-03 8.6597970e-04 ... 8.8870114e-01
  7.4680425e-02 3.6618497e-02]
 [1.7759772e-03 3.4847151e-02 9.6337688e-01 ... 9.9755019e-01
  2.1480024e-03 3.0182581e-04]
 [8.8761869e-04 2.4120489e-02 9.7499192e-01 ... 3.7024252e-04
  1.5705749e-02 9.8392397e-01]]
y:  [[1]
 [0]
 [0]
 ...
 [1]
 [2]
 [0]]
Epoch 1/30

In [30]:

from sklearn.metrics import accuracy_score
predictions=tree_net.predict_class(test)
# accuracy_score(truth_test,predictions)


In [31]:
accuracy_score(truth_test,predictions)


0.7619047619047619