In [None]:
import numpy as np
import pandas as pd
import scipy.stats as stats
from sklearn.model_selection import train_test_split
from collections import Counter
from scipy.stats import entropy
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score

In [None]:
# functions to redo phase 1
def get_outliers(column: pd.Series):
    lower_quartile = column.quantile(0.25)
    upper_quartile = column.quantile(0.75)
    iqr = upper_quartile - lower_quartile
    return column[(column < lower_quartile - 1.5*iqr) | (column > upper_quartile + 1.5*iqr)]
def iterative_reformat(processes_ptr: pd.DataFrame, connections_ptr: pd.DataFrame) -> pd.DataFrame:
    connections_ptr['ts'] = pd.to_datetime(connections_ptr['ts'])
    processes_ptr['ts'] = pd.to_datetime(processes_ptr['ts'])
    merged = processes_ptr.merge(connections_ptr, on=['ts', 'imei', 'mwra'], how='inner')
    merged['mwra'] = merged['mwra'].astype('int64')
    merged.drop(columns=['ts', 'imei'], inplace=True)
    to_drop = []
    # handle null values and outliers
    for column in merged.columns:
        # if more than 5% are NaN values or more than 5% are outliers, we don't use that column
        column_outliers = get_outliers(merged[column])
        if ((merged[column].isna().sum()/merged.shape[0] > 0.05) or 
            (column_outliers.shape[0] / merged.shape[0] > 0.05)):
            to_drop.append(column)
            continue
        # if there are some null values, we replace the data that's neutral in respect to mwra
        if merged[column].isnull().any():
            # we get means of the distributions for rows with present and non-present malware related activity
            means_per_mwra = merged.groupby('mwra')[column].mean()
            # we average those means, meaning the manufactured value won't be likely to affect predicted mwra 
            imputed_value = means_per_mwra.mean()
            merged[column].fillna(imputed_value, inplace=True)
        #  if there are any outliers, we replace them with the edge values. If we clipped all outliers, we would clutter way too much data together, so we clip only the most extreme ones
        if column_outliers.shape[0]:
            iqr = stats.iqr(merged[column])
            lower_limit = merged[column].quantile(0.25)  - 2.5 * iqr
            upper_limit = merged[column].quantile(0.75)  + 2.5 * iqr
            merged[column] = merged[column].clip(lower=lower_limit, upper=upper_limit)
    return merged.drop(columns=to_drop)
# functions to redo phase 2

In [None]:
# getting the data from previous phases
connections, processes, = pd.read_csv('data/connections.csv', sep='\t', keep_default_na=False, na_values=''), pd.read_csv('data/processes.csv', sep='\t', keep_default_na=False, na_values=''),
combined_table = iterative_reformat(processes, connections)

# JANKA added
columns_to_divide = combined_table.columns.difference(['mwra'])
combined_table[columns_to_divide] = combined_table[columns_to_divide].apply(lambda x: x / 2)
combined_table = combined_table.astype('int64') 

X = combined_table.drop(columns=['mwra'])
y = combined_table['mwra']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=69)

# Phase 3: Machine Learning

As we are dealing with continuous data, we will be using a different version of the ID3 algorithm opposed to the one shown in the lecture.  
Our implementation was inspired by the following implementation: https://www.geeksforgeeks.org/iterative-dichotomiser-3-id3-algorithm-from-scratch/

In [None]:
def calculate_entropy(data):
    counts = np.bincount(data)
    probabilities = counts / len(data)
    return entropy(probabilities, base=2) # base=2 to use log2


class ID3classifier:
    class Node:
        def __init__(self, feature=None, result=None, value=None, left_child=None, right_child=None):
            self.feature: np.array = feature # column index of criteria being tested
            self.result: Union[0, 1] = result
            self.split_point: int = value
            self.left_child: Optional['ID3classifier.Node'] = left_child
            self.right_child: Optional['ID3classifier.Node'] = right_child
            
    def __init__(self, max_depth=None, min_samples_split=2, threshold=0):
        self.root: Optional['ID3classifier.Node'] = None
        self.max_depth: int = max_depth
        self.min_samples_split: int = min_samples_split
        self.threshold: float = threshold
        
    def fit(self, data_samples: list[list[float]], target_values: list[int]):
        self.root = self._build(data_samples, target_values)
        
    def _build(self, X, y, depth=0):
        # going through halt conditions
        if (len(set(y)) == 1): # JANKA changed condition
            return self.Node(result=Counter(y).most_common(1)[0][0])
        
        best_entropy_gain = 0
        best_feature = None
        best_split_point = None
        features_length = X.shape[1]
        current_entropy = calculate_entropy(y)
        best_left_X, best_left_y, best_right_X, best_right_y = None, None, None, None
        for feature in range(features_length):
            #JANKA changed - we are looking for the best split point (= split with highest entropy gain)
            feature_values = set(X[:, feature])
            for split_point in feature_values:
                left_indices = X[:, feature] < split_point
                right_indices = X[:, feature] >= split_point
                left_X, left_y = X[left_indices], y[left_indices]
                right_X, right_y = X[right_indices], y[right_indices]
                left_entropy = calculate_entropy(left_y)
                right_entropy = calculate_entropy(right_y)
                left_p = len(left_y) / len(y)
                right_p = 1 - left_p
                entropy_gain = current_entropy - (left_p * left_entropy + right_p * right_entropy)
                if entropy_gain > best_entropy_gain:
                    best_entropy_gain = entropy_gain
                    best_feature = feature
                    best_split_point = split_point
                    best_left_X, best_left_y, best_right_X, best_right_y = left_X, left_y, right_X, right_y
                
        if best_entropy_gain > self.threshold:
            left_child = self._build(best_left_X, best_left_y, depth + 1)
            right_child = self._build(best_right_X, best_right_y, depth + 1)
            return self.Node(feature=best_feature, value=best_split_point ,left_child=left_child, right_child=right_child)
        
        # if no split found, return the most common label by default
        return self.Node(result=Counter(y).most_common(1)[0][0])
    
    def predict(self, item):
        if self.root is None:
            raise ValueError('The model has not been trained yet.')
            
        # the depth has to be at least 2, including root
        if self.root.left_child is None and self.root.right_child is None:
            return None
        return self._predict(self.root, item)
        
    def _predict(self, node, item):
        if node.result is not None:
            return node.result
        if item[node.feature] < node.split_point:
            return self._predict(node.left_child, item)
        return self._predict(node.right_child, item)
        

In [None]:
classifier = ID3classifier()
classifier.fit(X_train.values, y_train.values)

In [None]:
y_pred_test = np.array([classifier.predict(item) for item in X_test.values])
y_pred_train = np.array([classifier.predict(item) for item in X_train.values])

In [None]:
accuracy_score(y_test, y_pred_test), accuracy_score(y_train, y_pred_train)

In [None]:
precision_score(y_test, y_pred_test, average=None)