In [None]:
import pandas as pd
import numpy as np
import math
import graphviz

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
path="/content/drive/MyDrive/MachineLearning/13-03-2023/airfoil_self_noise.csv"

In [None]:
data=pd.read_csv(path)
data

Unnamed: 0,x0,x1,x2,x3,x4,y
0,800,0.0,0.3048,71.3,0.002663,126.201
1,1000,0.0,0.3048,71.3,0.002663,125.201
2,1250,0.0,0.3048,71.3,0.002663,125.951
3,1600,0.0,0.3048,71.3,0.002663,127.591
4,2000,0.0,0.3048,71.3,0.002663,127.461
...,...,...,...,...,...,...
1498,2500,15.6,0.1016,39.6,0.052849,110.264
1499,3150,15.6,0.1016,39.6,0.052849,109.254
1500,4000,15.6,0.1016,39.6,0.052849,106.604
1501,5000,15.6,0.1016,39.6,0.052849,106.224


In [None]:
class Node():
  def __init__(self, name, value):
    self.name = name
    self.value = value
    self.children = []
    self.label=[]
        
  def add_child(self, node):
    self.children.append(node)

  def add_label(self, label):
    self.label.append(label)

  def print_node(self):
    print(self.name or self.value)
    for i in self.children:
      i.print_node()  

In [None]:

class RegressionTree:
    def __init__(self, dataset, target_feature):
      # Initialize an empty list of labels for visualization and construct the decision tree
        self.root = self.construct(dataset, target_feature)

    def construct(self, dataset, target_feature):
        # If the dataset is empty, return a leaf node with no value
        if len(dataset) == 0:
            return Node(None, None)
        # If all the values in the target feature are the same, return a leaf node with that value
        if len(dataset[target_feature].unique()) == 1:
            return Node(None, dataset[target_feature].iloc[0])
        # If there are no more features to split on, return a leaf node with the most common value in the target feature
        if len(dataset.columns) == 1:
            return Node(None, dataset[target_feature].mean())


        num_features=len(dataset.columns)-1
        # Initialize variables to keep track of the best feature to split on and the highest information gain
        best= self.get_best_split(dataset, num_features)
        best_feature=dataset.columns[best["feature_index"]]
        val=best["threshold"]
        # check if information gain is positive
        root_node = Node(best_feature, val)
        # print("II")

        if best["var_red"]>0:
          # Create a new root node with the best feature and highest information gain
          # Split the dataset into subsets based on the values of the best feature
          splits = self.split(dataset, best["feature_index"],val)
          # Iterate over the subsets and construct a decision tree for each one
          for i, s in enumerate(splits):
              # Create a new dataset without the best feature
              new_dataset = s.drop(best_feature, axis=1)
              # Construct a decision tree for this subset
              child = self.construct(new_dataset, target_feature)
              # Add the label for this split to the list of labels for the root node
              root_node.add_label(str(i))
              # Add the child tree as a child of the root node
              root_node.add_child(child)
        # Return the root node
        return root_node


    def get_best_split(self, dataset, num_features):
        ''' function to find the best split '''
        
        # dictionary to store the best split
        best_split = {}
        max_var_red = float("inf")
        # loop over all the features
        for feature_index in range(num_features):
            feature_values = dataset.iloc[:, feature_index]
            possible_thresholds = np.unique(feature_values)
            # loop over all the feature values present in the data
            for threshold in possible_thresholds:
                # get current split
                dataset_left, dataset_right = self.split(dataset, feature_index, threshold)
                # check if childs are not null
                if len(dataset_left)>0 and len(dataset_right)>0:
                    y, left_y, right_y = dataset.iloc[:, -1:], dataset_left.iloc[:, -1:], dataset_right.iloc[:, -1:]
                    # print(y,left_y,right_y)
                    # compute information gain
                    curr_var_red = self.SSE(y, left_y, right_y)
                    # update the best split if needed
                    if curr_var_red<max_var_red:
                        best_split["feature_index"] = feature_index
                        best_split["threshold"] = threshold
                        # best_split[1] = dataset_left
                        # best_split[2] = dataset_right
                        best_split["var_red"] = curr_var_red
                        max_var_red = curr_var_red
                        
        # return best split
        return best_split
    
    def split(self, dataset, feature_index, threshold):
        ''' function to split the data '''
        # for r in dataset:
        #   print(r)
        # print(type(threshold))
        # print(dataset.loc[dataset[dataset.columns[feature_index]]<=threshold])
        dataset_left = dataset.loc[dataset[dataset.columns[feature_index]]<=threshold]
        dataset_right = dataset.loc[dataset[dataset.columns[feature_index]]>threshold]
        # print(dataset_left)
        return dataset_left, dataset_right
    
    def SSE(self, parent, l_child, r_child):
        ''' function to compute variance reduction '''
        reduction = (len(l_child) * np.var(l_child) + len(r_child) * np.var(r_child))
        # print(reduction)
        return float(reduction)

    def predict(self,test):
        predictions=[]
        for _,row in test.iterrows():
          node=self.root
          while(node.name):
            feature=node.name
            val=row[feature]
            if val<=node.value:
                node=node.children[0]
            else:
                node=node.children[1]
          predictions.append(node.value)
        return predictions

    # def accuracy(self,actual,predicted):
    #     actual=list(actual)
    #     predicted=list(predicted)
    #     correct=0
    #     for i in range(len(actual)):
    #       if actual[i]==predicted[i]:
    #         correct+=1
        
    #     return (correct/len(actual))*100




In [None]:
X = data.iloc[:, :-1]
Y = data.iloc[:, -1]
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=.2, random_state=41)

In [None]:
reg=RegressionTree(data,"y")


In [None]:
Y_pred = reg.predict(X_test) 
from sklearn.metrics import mean_squared_error
np.sqrt(mean_squared_error(Y_test, Y_pred))

#Q_LEARNING

In [None]:
import numpy as np

class Environment:
    def __init__(self):
        self.num_states = 6
        self.num_actions = 4
        self.reward_table = np.array([
            [1, 1, 100],
            [1, 1, 1,],
        ])
        self.location=np.array([
            [1,2,6],
            [3,4,5]
        ])
        self.actions=np.array(["up","right","down","left"])

    def reset(self):
        self.state = np.random.randint(1, self.num_states+1)
        return self.state

    def step(self, action):
        r,c=np.argwhere(self.location==self.state)[0]
        next_state = self.get_next_location(r,c,action)
        reward = self.reward_table[r, c]
        done = (next_state == 6)
        self.state = next_state
        return next_state, reward, done
    
    def _step(self,state,action):
        r,c=np.argwhere(self.location==state)[0]
        next_state = self.get_next_location(r,c,action)
        return next_state

    def get_next_location(self,current_row_index, current_column_index, action_index):
        new_row_index = current_row_index
        new_column_index = current_column_index
        if self.actions[action_index] == 'up' and current_row_index > 0:
          new_row_index -= 1
        elif self.actions[action_index] == 'right' and current_column_index < 2:
          new_column_index += 1
        elif self.actions[action_index] == 'down' and current_row_index < 1:
          new_row_index += 1
        elif self.actions[action_index] == 'left' and current_column_index > 0:
          new_column_index -= 1
        return self.location[new_row_index, new_column_index]

class QLearning:
    def __init__(self, env, num_episodes, gamma=0.8, alpha=0.5, epsilon=0.1):
        self.env = env
        self.num_episodes = num_episodes
        self.gamma = gamma
        self.prev = None
        self.epsilon = epsilon
        self.q_table = np.zeros((env.num_states, env.num_actions))

    def choose_action(self, state,epsilon=None):
        if epsilon==None:
          epsilon=self.epsilon
        if np.random.random() < epsilon:
          # print("SP")
          return np.argmax(self.q_table[state-1, :])
        else:
          return np.random.randint(0, self.env.num_actions)

    def train(self):
        for episode in range(self.num_episodes):
            state = self.env.reset()
            done = False
            
            while not done:
                action = self.choose_action(state)
                # print(state," : ",action)
                next_state, reward, done = self.env.step(action)
                # r,c=np.argwhere(self.env.location==state)[0]
                self.q_table[state-1, action] = reward + self.gamma * np.max(self.q_table[next_state-1, :])
                state = next_state
        return self.q_table

    def get_shortest_path(self,state):
        #return immediately if this is an invalid starting location
        if state==6:
          return []
        else: #if this is a 'legal' starting location
          shortest_path = []
          shortest_path.append([state])
          cnt=0
          #continue moving along the path until we reach the goal (i.e., the item packaging location)
          while not state==6:
            cnt+=1
            #get the best action to take
            action = self.choose_action(state, 1.)
            print(state," : ",action)
            #move to the next location on the path, and add the new location to the list
            state= self.env._step(state,action)
            # print("NS",next_state)
            shortest_path.append([state])
          return shortest_path


In [None]:
env = Environment()
q_learning = QLearning(env, num_episodes=100)
q_table = q_learning.train()
print(q_table)
q_learning.get_shortest_path(5)

[[220.32216525 274.15270656 160.10470656 220.32216525]
 [228.73888    353.35270656 198.8808832  220.32216525]
 [198.8808832  198.8808832  160.10470656 160.10470656]
 [247.351104   274.15270656 198.8808832  160.10470656]
 [353.35270656 247.351104   247.351104   198.8808832 ]
 [440.4408832  425.551104   327.73888    346.351104  ]]
5  :  0


[[5], [6]]