In [None]:
import numpy as np
import pandas as pd
import json

Dataset Preparation

In [None]:
data = pd.read_csv(
    "/content/drive/MyDrive/Study/S5/AI/twitter_human_bots_dataset.csv",
    skiprows=0,
    nrows=1000,
    usecols=["default_profile","favourites_count","friends_count","followers_count","geo_enabled","statuses_count","verified","average_tweets_per_day","account_age_days","account_type"]
)
data.tail(10)

Unnamed: 0,default_profile,favourites_count,followers_count,friends_count,geo_enabled,statuses_count,verified,average_tweets_per_day,account_age_days,account_type
990,False,23389,723,131,True,10801,False,2.531,4267,human
991,False,2413,84437,1216,False,71577,True,17.189,4164,human
992,False,6901,7384,1565,False,20566,True,5.675,3624,human
993,False,16606,525,163,False,4898,False,3.355,1460,bot
994,False,8040,795,761,True,9417,False,2.855,3298,human
995,True,42,775,1205,False,949,False,0.54,1756,human
996,False,40329,184,2255,False,2090,False,0.532,3927,human
997,False,1,166,89,True,12497,False,3.281,3809,bot
998,True,948,47,177,False,2036,False,2.065,986,human
999,False,1353,801138,710,False,13292,True,3.831,3470,human


In [None]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1000 entries, 0 to 999
Data columns (total 10 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   default_profile         1000 non-null   bool   
 1   favourites_count        1000 non-null   int64  
 2   followers_count         1000 non-null   int64  
 3   friends_count           1000 non-null   int64  
 4   geo_enabled             1000 non-null   bool   
 5   statuses_count          1000 non-null   int64  
 6   verified                1000 non-null   bool   
 7   average_tweets_per_day  1000 non-null   float64
 8   account_age_days        1000 non-null   int64  
 9   account_type            1000 non-null   object 
dtypes: bool(3), float64(1), int64(5), object(1)
memory usage: 57.7+ KB


In [None]:
# Features mapping
data["default_profile"] = data["default_profile"].astype(int)
data["geo_enabled"] = data["geo_enabled"].astype(int)
data["verified"] = data["verified"].astype(int)
data.tail(10)

Unnamed: 0,default_profile,favourites_count,followers_count,friends_count,geo_enabled,statuses_count,verified,average_tweets_per_day,account_age_days,account_type
990,0,23389,723,131,1,10801,0,2.531,4267,human
991,0,2413,84437,1216,0,71577,1,17.189,4164,human
992,0,6901,7384,1565,0,20566,1,5.675,3624,human
993,0,16606,525,163,0,4898,0,3.355,1460,bot
994,0,8040,795,761,1,9417,0,2.855,3298,human
995,1,42,775,1205,0,949,0,0.54,1756,human
996,0,40329,184,2255,0,2090,0,0.532,3927,human
997,0,1,166,89,1,12497,0,3.281,3809,bot
998,1,948,47,177,0,2036,0,2.065,986,human
999,0,1353,801138,710,0,13292,1,3.831,3470,human


Decision tree algorithm

In [None]:
class Node(dict):
    def __init__(self, feature_index=None, threshold=None, left=None, right=None, info_gain=None, value=None):
        ''' constructor ''' 
        
        self.__dict__ = self

        # for decision node
        self.feature_index = feature_index
        self.threshold = threshold
        self.left = left
        self.right = right
        self.info_gain = info_gain
        
        # for leaf node
        self.value = value

In [None]:
class DecisionTreeClassifier():
    def __init__(self, min_samples_split=2, max_depth=2):
        ''' constructor '''
        
        # initialize the root of the tree 
        self.root = None
        
        # stopping conditions
        self.min_samples_split = min_samples_split
        self.max_depth = max_depth
        
    def build_tree(self, dataset, curr_depth=0):
        ''' recursive function to build the tree ''' 
        
        X, Y = dataset[:,:-1], dataset[:,-1]
        num_samples, num_features = np.shape(X)
        
        # split until stopping conditions are met
        if num_samples>=self.min_samples_split and curr_depth<=self.max_depth:
            # find the best split
            best_split = self.get_best_split(dataset, num_samples, num_features)
            # check if information gain is positive
            if best_split["info_gain"]>0:
                # recur left
                left_subtree = self.build_tree(best_split["dataset_left"], curr_depth+1)
                # recur right
                right_subtree = self.build_tree(best_split["dataset_right"], curr_depth+1)
                # return decision node
                return Node(best_split["feature_index"], best_split["threshold"], 
                            left_subtree, right_subtree, best_split["info_gain"])
        
        # compute leaf node
        leaf_value = self.calculate_leaf_value(Y)
        # return leaf node
        return Node(value=leaf_value)
    
    def get_best_split(self, dataset, num_samples, num_features):
        ''' function to find the best split '''
        
        # dictionary to store the best split
        best_split = {}
        max_info_gain = -float("inf")
        
        # loop over all the features
        for feature_index in range(num_features):
            feature_values = dataset[:, feature_index]
            possible_thresholds = np.unique(feature_values)
            # loop over all the feature values present in the data
            for threshold in possible_thresholds:
                # get current split
                dataset_left, dataset_right = self.split(dataset, feature_index, threshold)
                # check if childs are not null
                if len(dataset_left)>0 and len(dataset_right)>0:
                    y, left_y, right_y = dataset[:, -1], dataset_left[:, -1], dataset_right[:, -1]
                    # compute information gain
                    curr_info_gain = self.information_gain(y, left_y, right_y)
                    # update the best split if needed
                    if curr_info_gain>max_info_gain:
                        best_split["feature_index"] = feature_index
                        best_split["threshold"] = threshold
                        best_split["dataset_left"] = dataset_left
                        best_split["dataset_right"] = dataset_right
                        best_split["info_gain"] = curr_info_gain
                        max_info_gain = curr_info_gain
                        
        # return best split
        return best_split
    
    def split(self, dataset, feature_index, threshold):
        ''' function to split the data '''
        
        dataset_left = np.array([row for row in dataset if row[feature_index]<=threshold])
        dataset_right = np.array([row for row in dataset if row[feature_index]>threshold])
        return dataset_left, dataset_right
    
    def information_gain(self, parent, l_child, r_child):
        ''' function to compute information gain '''
        
        weight_l = len(l_child) / len(parent)
        weight_r = len(r_child) / len(parent)
        gain = self.entropy(parent) - (weight_l*self.entropy(l_child) + weight_r*self.entropy(r_child))
        return gain
    
    def entropy(self, y):
        ''' function to compute entropy '''
        
        class_labels = np.unique(y)
        entropy = 0
        for cls in class_labels:
            p_cls = len(y[y == cls]) / len(y)
            entropy += -p_cls * np.log2(p_cls)
        return entropy
        
    def calculate_leaf_value(self, Y):
        ''' function to compute leaf node '''
        
        Y = list(Y)
        return max(Y, key=Y.count)
    
    def fit(self, X, Y):
        ''' function to train the tree '''
        
        dataset = np.concatenate((X, Y), axis=1)
        self.root = self.build_tree(dataset)

Preparing data for train and test

In [None]:
X = data.iloc[:, :-1].values
Y = data.iloc[:, -1].values.reshape(-1,1)
from sklearn.model_selection import train_test_split
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=.2, random_state=41)

Trainning...

In [None]:
classifier = DecisionTreeClassifier(min_samples_split=2, max_depth=3)
classifier.fit(X_train,Y_train)
print("Finished.")

Finished.


Model Serializer

In [None]:
class TreeSerializer():
  def __init__(self, tree, features):
    ''' constructor '''
    
    self.model = {'features': features, 'model': tree}
  
  def export(self, filename="model.json"):
    print(self.model)
    # Clean data
    cleaned_tree = self.del_none(self.model)
    # serialize
    serialized_tree = json.dumps(cleaned_tree, indent=2)
    file = open(filename, "w")
    file.write(serialized_tree)
    file.close()
  
  def del_none(self, d):
    """
    Delete keys with the value ``None`` in a dictionary, recursively.
    This alters the input so you may wish to ``copy`` the dict first.
    """
    for key, value in list(d.items()):
        if value is None:
            del d[key]
        elif isinstance(value, dict):
            self.del_none(value)
    return d

Exporting Model with used features

In [None]:
features = data.columns.values.tolist()
features.pop() # remove last element
serializer = TreeSerializer(classifier.root.copy(), features)
serializer.export()

{'features': ['default_profile', 'favourites_count', 'followers_count', 'friends_count', 'geo_enabled', 'statuses_count', 'verified', 'average_tweets_per_day', 'account_age_days'], 'model': {'feature_index': 2, 'threshold': 66.0, 'left': {'feature_index': 8, 'threshold': 1445.0, 'left': {'feature_index': 8, 'threshold': 1434.0, 'left': {'feature_index': 1, 'threshold': 29.0, 'left': {'feature_index': None, 'threshold': None, 'left': None, 'right': None, 'info_gain': None, 'value': 'bot'}, 'right': {'feature_index': None, 'threshold': None, 'left': None, 'right': None, 'info_gain': None, 'value': 'bot'}, 'info_gain': 0.08098167790100352, 'value': None}, 'right': {'feature_index': None, 'threshold': None, 'left': None, 'right': None, 'info_gain': None, 'value': 'human'}, 'info_gain': 0.2126364277729874, 'value': None}, 'right': {'feature_index': 2, 'threshold': 27.0, 'left': {'feature_index': 1, 'threshold': 64.0, 'left': {'feature_index': None, 'threshold': None, 'left': None, 'right': 

Predectier algorithm

In [None]:
class Predicter():
  def __init__(self, model):
    ''' constructor '''
    
    self.model = model

  def predict(self, X):
    ''' function to predict new dataset '''
        
    return [self.make_prediction(x, self.model) for x in X]
    
  def make_prediction(self, x, node):
    ''' function to predict a single data point '''
    
    if hasattr(node, 'value') and node.value != None: return node.value
    feature_val = x[node.feature_index]
    if feature_val <= node.threshold:
      return self.make_prediction(x, node.left)
    else:
      return self.make_prediction(x, node.right)

Model Deserializer

In [None]:
class TreeDeserializer():

  def importModel(self, filename="model.json"):
    f = open(filename)
    modelData = json.load(f)
    f.close()

    # Build the tree
    return self.build_tree(modelData['model'])

  def build_tree(self, tree):

    if tree == None: return None

    return Node(
        feature_index = tree.get('feature_index', None),
        info_gain = tree.get('info_gain', None),
        threshold = tree.get('threshold', None),
        left = self.build_tree(tree.get('left', None)),
        right = self.build_tree(tree.get('right', None)),
        value = tree.get('value', None)
    )


Model Deserialisation

In [None]:
# Importing the model we saved earlier
deserializer = TreeDeserializer()
model = deserializer.importModel()

Accuracy checker

In [None]:
predicter = Predicter(model)
Y_pred = predicter.predict(X_test)
from sklearn.metrics import accuracy_score
accuracy_score(Y_test, Y_pred)

0.785