# Энтропия и критерий Джини

$p_i$ - вероятность нахождения системы в i-ом состоянии.

Энтропия Шеннона определяется для системы с N возможными состояниями следующим образом

$S = - \sum_{i=1}^Np_ilog_2p_i$
	 
Критерий Джини (Gini Impurity). Максимизацию этого критерия можно интерпретировать как максимизацию числа пар объектов одного класса, оказавшихся в одном поддереве.

В общем случае критерий Джини считается как
$G = 1 - \sum_k(p_k)^2$
 
Необходимо посчитать, значения Энтропии и критерия Джини

In [7]:
import numpy as np
import math

def get_possibilities(y):
    count = len(y)
    uniq_values = set(y)
    possibilities = []
    for value in uniq_values:
        possibilities.append(len(y[y ==value]) / count)
    return possibilities

def gini_impurity(y: np.ndarray) -> float:
    possibilities = get_possibilities(y)
    sum = 0
    for p in possibilities:
        sum += p * p
    return round(1 - sum, 3)

def entropy(y: np.ndarray) -> float:
    possibilities = get_possibilities(y)
    sum = 0
    for p in possibilities:
        sum += p * math.log2(p)
    return round(-sum, 3)

def calc_criteria(y: np.ndarray) -> (float, float):
    assert y.ndim == 1
    return entropy(y), gini_impurity(y)

y = np.array([1,1,1,1,1,1,0,0,0,0])
calc_criteria(y)

(0.971, 0.48)

# Information gain
Вам надо реализовать функцию inform_gain, которая будет вычислять прирост информации для критерия (энтропия или критерий Джини) при разбиении выбрки по признаку (threshold).

Прирост информации при разбиении выборки по признаку Q (например x≤12) определяется как

$IG(Q)=S_0- \sum_{i=1}^q\frac{N_i}{N}S_i$	
 
где q - число групп после разбиения. $N_i$ - число элементов выборки, у которых признак Q имеет i-ое значение.

И написать функцию get_best_threshold, которая будет находить наилучшее разбиение выборки.

На вход подается:

- X - одномерный массив - значения признака.
- y - значения бинарных классов.
- criteria_func - функция критерия, для которой вычислется наилучшее разбиение (Добавлять код из предыдущей задачи не нужно, мы сами передадим нужную функцию).
- thr - значение разбиения

In [29]:
import numpy as np
import math

def get_possibilities(y):
    count = len(y)
    y = list(y)
    uniq_values = set(y)
    possibilities = []
    for value in uniq_values:
        possibilities.append(len(list(filter(lambda x: x == value, y))) / count)
    return possibilities

def gini_impurity(y: np.ndarray) -> float:
    possibilities = get_possibilities(y)
    sum = 0
    for p in possibilities:
        sum += p * p
    return round(1 - sum, 3)

def entropy(y: np.ndarray) -> float:
    possibilities = get_possibilities(y)
    sum = 0
    for p in possibilities:
        sum += p * math.log2(p)
    return round(-sum, 3)

def len_check_criteria_func(arr, criteria_func):
    if len(arr) == 0:
        return 0
    else:
        return criteria_func(arr)

def inform_gain(X: np.ndarray, y: np.ndarray, threshold: float, criteria_func) -> float:
    s0 = criteria_func(y)
    count = y.shape[0]
    first = []
    second = []
    for i in range(count):
        if X[i] <= threshold:
            first.append(y[i])
        else:
            second.append(y[i])
    s1 = len_check_criteria_func(first, criteria_func)
    s2 = len_check_criteria_func(second, criteria_func)
    return s0 - len(first) / count * s1 - len(second) / count * s2
            

def get_best_threshold(X: np.ndarray, y: np.ndarray, criteria_func) -> (float, float):
    best_threshold = 0
    best_score = 0
    uniq_values = set(X)
    for value in uniq_values:
        score = inform_gain(X, y, value, criteria_func)
        if score > best_score:
            best_score = score
            best_threshold = value
    return best_threshold, best_score

X = np.array([3, 9, 0, 4, 7, 2, 1, 6, 8, 5])
y = np.array([0, 1, 0, 0, 1, 0, 0, 1, 1, 1])
threshold=3
criteria_func=entropy
print(inform_gain(X, y, threshold, criteria_func))

X = np.array([3, 9, 0, 4, 7, 2, 1, 6, 8, 5])
y = np.array([0, 1, 0, 0, 1, 0, 0, 1, 1, 1])
criteria_func=entropy
get_best_threshold(X, y, criteria_func)

0.61


(4, 1.0)

In [8]:
import math

print(1 -(-(5/6) * math.log2(5/6) - (1/6)* math.log2(1/6))* 0.6)

0.6099865470109875


# Best split

Реализуйте функцию find_best_split, которая находит наилучшее разбиение по всем признакам. На вход подется обучающая выборка и функция критерий. Необходимо вернуть: индекс фичи, значение границы (threshold) и результат разбиение (information gain).


In [24]:
import math

def get_possibilities(y):
    count = len(y)
    y = list(y)
    uniq_values = set(y)
    possibilities = []
    for value in uniq_values:
        possibilities.append(len(list(filter(lambda x: x == value, y))) / count)
    return possibilities

def gini_impurity(y: np.ndarray) -> float:
    possibilities = get_possibilities(y)
    sum = 0
    for p in possibilities:
        sum += p * p
    return round(1 - sum, 3)

def entropy(y: np.ndarray) -> float:
    possibilities = get_possibilities(y)
    sum = 0
    for p in possibilities:
        sum += p * math.log2(p)
    return round(-sum, 3)

def inform_gain(X: np.ndarray, y: np.ndarray, threshold: float, criteria_func) -> float:
    s0 = criteria_func(y)
    count = y.shape[0]
    first = []
    second = []
    for i in range(count):
        if X[i] <= threshold:
            first.append(y[i])
        else:
            second.append(y[i])
    s1 = criteria_func(first)
    s2 = criteria_func(second)
    return s0 - len(first) / count * s1 - len(second) / count * s2
            

def get_best_threshold(X: np.ndarray, y: np.ndarray, criteria_func) -> (float, float):
    assert X.ndim == 1
    assert y.ndim == 1
    best_threshold = 0
    best_score = 0
    uniq_values = set(X)
    for value in uniq_values:
        score = inform_gain(X, y, value, criteria_func)
        if score > best_score:
            best_score = score
            best_threshold = value
    return best_threshold, best_score

def find_best_split(X, y, criteria_func):
    assert X.ndim == 2
    assert y.ndim == 1
    best_feature = 0
    best_score = 0
    best_threshold = 0
    
    for i in range(X.shape[1]):
        feature_column = X[:, i]
        threshold, score = get_best_threshold(feature_column, y, criteria_func)
        if score > best_score:
            best_score = score
            best_feature = i
            best_threshold = threshold
    
    return best_feature, best_threshold, best_score

X = np.array([[1, 1], [1, -1], [-1,-1], [-1, 1]])
y = np.array([1, 1, 0, 0])
criteria_func=entropy
find_best_split(X, y, criteria_func)

(0, -1, 1.0)

# Мое дерево решений

Ваша задача реализовать свой простой KNNClassifier для бинарных данных. Вам нужно реализовать 3 метода:

fit - обучение классификатора
predict - предсказание для новых объектов
predict_proba - предсказание вероятностей новых объектов
У нашего классификатора будет лишь два гиперпараметра - максимальная глубина дерева max_depth и критерий разбиения criterion. Энтропия или Джини.

Все функции из предыдущих заданий нужно добавить в этот код.

На вход будет подаваться выборка объектов X. y - результат бинарной классификации 0 или 1.



In [195]:
import math
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin

class MyDecisionTreeClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, max_depth=4, criterion='entropy'): 
        self.eps = 0.001
        self.max_depth = max_depth
        self.criterion = criterion # 'entropy' or 'gini' 
        self.tree = {}
        self._criteria_func = {
            'gini': self._gini_impurity,
            'entropy': self._entropy
        }
        
    def _get_possibilities(self, y):
        count = len(y)
        y = list(y)
        uniq_values = set(y)
        possibilities = []
        for value in uniq_values:
            possibilities.append(len(list(filter(lambda x: x == value, y))) / count)
        return possibilities

    def _entropy(self, y: np.ndarray) -> float:
        possibilities = self._get_possibilities(y)
        sum = 0
        for p in possibilities:
            sum += p * math.log2(p)
        return round(-sum, 3)

    def _gini_impurity(self, y: np.ndarray) -> float:
        possibilities = self._get_possibilities(y)
        sum = 0
        for p in possibilities:
            sum += p * p
        return round(1 - sum, 3)
    
    def _inform_gain(self, X: np.ndarray, y: np.ndarray, threshold: float, criteria_func) -> float:
        print('X ', X)
        print('threshold ', threshold)
        print('y ', y)
        s0 = criteria_func(y)
        count = y.shape[0]
        first = []
        second = []
        for i in range(count):
            if X[i] < threshold - self.eps:
                first.append(y[i])
            else:
                second.append(y[i])
        print('first ', first)
        print('second ', second)
        s1 = criteria_func(first)
        s2 = criteria_func(second)
        return s0 - len(first) / count * s1 - len(second) / count * s2
    
    def _get_best_threshold(self, X: np.ndarray, y: np.ndarray, criteria_func) -> (float, float):
        found_bigger_score = False
        best_threshold = 0
        best_score = 0
        uniq_values = set(X)
        for value in uniq_values:
            score = self._inform_gain(X, y, value, criteria_func)
            print('value ', value, ' score ', score)
            if score > best_score:
                found_bigger_score = True
                best_score = score
                best_threshold = value
        if found_bigger_score:
            return best_threshold, best_score
        return None, None
    
    def _find_best_split(self, X, y, criteria_func):
        best_feature = 0
        best_score = 0
        best_threshold = 0
        found_best = False

        for i in range(X.shape[1]):
            feature_column = X[:, i]
            threshold, score = self._get_best_threshold(feature_column, y, criteria_func)
            
            print('X ', feature_column)
            print('y ', y)
            print('column ', i, ' threshold ', threshold, ' score ', score)
            
            if score is None: 
                continue
            
            if score > best_score:
                found_best = True
                best_score = score
                best_feature = i
                best_threshold = threshold
        if found_best:
            return best_feature, best_threshold
        return None, None
    
    
    def _get_biggest_class(self, y):
        y = list(y)
        return max(set(y), key = y.count)
    
    def _get_probs(self, y):
        count = y.shape[0]
        ones_count = np.count_nonzero(y == 1)
        null_count = count - ones_count
        return [null_count / count, ones_count / count]
        
    def _build_tree(self, X, y, depth=0):
        if depth == 0:
            return
        
        is_leaf = False
        
        split_feature, split_value = self._find_best_split(X, y, self._criteria_func[self.criterion])
        
        if split_feature is None and split_value is None:
            val = self._get_biggest_class(y)
            return {'cond': val, 'leaf': True}
        
        left_inds = X[:, split_feature] < split_value - self.eps
        right_inds = X[:, split_feature] >= split_value - self.eps

        left_tree = self._build_tree(X[left_inds], y[left_inds], depth - 1)
        right_tree = self._build_tree(X[right_inds], y[right_inds], depth - 1)
        
        if left_tree is None and right_tree is None:
            is_leaf = True
            
        if is_leaf and split_feature is not None:
            biggest_class = self._get_biggest_class(y)
            proba = self._get_probs(y)
            
            return {'cond': biggest_class, 'leaf': True, 'proba': proba}
            

        return {'cond': (split_feature, split_value), 'leaf': is_leaf,
                'left': left_tree, 'right': right_tree}
        
    def fit(self, X: np.ndarray, y: np.ndarray):
        self.tree = self._build_tree(X, y, depth=self.max_depth)
        return self        
    
    def _predict(self, X):
        predictions = []
        proba = []
        for elem in X:
            current_tree = self.tree
            while type(current_tree['cond']) is tuple:
                feature = current_tree['cond'][0]
                value = current_tree['cond'][1]
                if elem[feature] < value - self.eps:
                    current_tree = current_tree['left']
                else:
                    current_tree = current_tree['right']
            value = current_tree['cond']
            if 'proba' in current_tree:
                proba.append(current_tree['proba'])
            elif value == 0:
                proba.append([1.0, 0.0])
            else:
                proba.append([0.0, 1.0])
            predictions.append(value)
        return predictions, proba
    
    def predict_proba(self, X: np.ndarray):
        _, proba = self._predict(X)
        return proba
    
    def predict(self, X: np.ndarray): # получаем
        predictions, _ = self._predict(X)
        return predictions

# X_clf = np.array([[-1, 1], [-1, -1], [2.5, 1], [1, 1], [2, 2], [1, -1]])
# y_clf = np.array([0, 0, 0, 1, 1, 1])

X_clf = np.array([[1, 1], [2, -1], [-1, -1], [-1, -4], [2, 3], [3, 1]])
y_clf = np.array([1, 1, 0, 0, 0, 0])

model = MyDecisionTreeClassifier(max_depth=3, criterion='entropy').fit(X_clf, y_clf)
print(model.tree)
y_pred = model.predict(np.array([[2, 1], [0.5, 1]])) 
print(y_pred) # np.array([1, 0])
y_prob = model.predict_proba(np.array([[2, 1], [0.5, 1]])) 
print(y_prob) #np.array([[0.0, 1.0], [1.0, 0.0]])



X  [ 1  2 -1 -1  2  3]
threshold  1
y  [1 1 0 0 0 0]
first  [0, 0]
second  [1, 1, 0, 0]
value  1  score  0.2513333333333334
X  [ 1  2 -1 -1  2  3]
threshold  2
y  [1 1 0 0 0 0]
first  [1, 0, 0]
second  [1, 0, 0]
value  2  score  0.0
X  [ 1  2 -1 -1  2  3]
threshold  3
y  [1 1 0 0 0 0]
first  [1, 1, 0, 0, 0]
second  [0]
value  3  score  0.10883333333333334
X  [ 1  2 -1 -1  2  3]
threshold  -1
y  [1 1 0 0 0 0]
first  []
second  [1, 1, 0, 0, 0, 0]
value  -1  score  0.0
X  [ 1  2 -1 -1  2  3]
y  [1 1 0 0 0 0]
column  0  threshold  1  score  0.2513333333333334
X  [ 1 -1 -1 -4  3  1]
threshold  1
y  [1 1 0 0 0 0]
first  [1, 0, 0]
second  [1, 0, 0]
value  1  score  0.0
X  [ 1 -1 -1 -4  3  1]
threshold  3
y  [1 1 0 0 0 0]
first  [1, 1, 0, 0, 0]
second  [0]
value  3  score  0.10883333333333334
X  [ 1 -1 -1 -4  3  1]
threshold  -4
y  [1 1 0 0 0 0]
first  []
second  [1, 1, 0, 0, 0, 0]
value  -4  score  0.0
X  [ 1 -1 -1 -4  3  1]
threshold  -1
y  [1 1 0 0 0 0]
first  [0]
second  [1, 1, 0, 0, 0]
va

# Наивный Байес

Требуется написать свой классификтор, на основе наивного баеса. Необходимо реализовать аналог MultinomialNB.

$y_{test}=argmax_cln(P(y_{test}=c))+\sum_{j=1}^mln(P(f_j|y_{test}=c)+ \alpha)$, c∈{0,1}

На вход подаются численные категориальные признаки. Классы: 00 и 11. У классификатора будет единственный параметр - alpha.

In [198]:
from sklearn.base import BaseEstimator, ClassifierMixin
from collections import defaultdict
from math import log, inf
import numpy as np

class MyNaiveBayes(BaseEstimator, ClassifierMixin):
    def __init__(self, alpha=1):
        self.alpha = alpha
        self.classes = [0, 1]
        self.class_counts = {0: 0, 1: 0}
        self.class_possibilities = {0: 0, 1: 0}
        self.indicators = {0: {}, 1: {}}
        
    def fit(self, X: np.ndarray, y: np.ndarray):
        self.class_counts = {0: 0, 1: 0}
        self.class_possibilities = {0: 0, 1: 0}
        self.indicators = {0: {}, 1: {}}
        
        n = y.shape[0]
        features_len = len(X[0])
        for cls in self.classes:
            for j in range(features_len):
                self.indicators[cls][j] = {}
        
        for i in range(n):
            cls = y[i]
            self.class_counts[cls] += 1
            for feature_num in range(features_len):
                feature_value = X[i][feature_num]
                if feature_value not in self.indicators[cls][feature_num].keys():
                    self.indicators[cls][feature_num][feature_value] = 0
                self.indicators[cls][feature_num][feature_value] += 1
    
        for cls in self.classes:
            self.class_possibilities[cls] = self.class_counts[cls] / n
            
        return self
            
    def predict(self, X: np.ndarray):
        features_len = len(X[0])
        result = []
        for obj in X:
            max_value = -inf
            result_cls = None
            for cls in self.classes:
                value = log(self.class_possibilities[cls])
                for feature_num in range(features_len):
                    feature_value = obj[feature_num]
                    if feature_value not in self.indicators[cls][feature_num].keys():
                        value += log(self.alpha)
                    else:
                        value += log(self.indicators[cls][feature_num][feature_value] / self.class_counts[cls] + self.alpha)
                if value > max_value:
                    max_value = value
                    result_cls = cls    
            result.append(result_cls)
        return result
    
X_clf = np.array([[1, 1], [1, -1], [-1,-1], [-1, 1]])
y_clf = np.array([1, 1, 0, 0])

model = MyNaiveBayes(alpha=1).fit(X_clf, y_clf)

print(model.class_counts)
print(model.class_possibilities)
print(model.indicators)

y_pred = model.predict(np.array([[1, 2], [-1, -2]]))
print(y_pred) # [1, 0]

{0: 2, 1: 2}
{0: 0.5, 1: 0.5}
{0: {0: {-1: 2}, 1: {-1: 1, 1: 1}}, 1: {0: {1: 2}, 1: {1: 1, -1: 1}}}
[1, 1, 0]


In [92]:
from nltk.tokenize import WordPunctTokenizer, TweetTokenizer
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from nltk.corpus import stopwords
import pandas as pd
import numpy as np
import math
import re

common_words = ['was', 'were', 'and', 'you', 'the', 'did']
stops = set(stopwords.words("english"))      

def preprocess(df: pd.DataFrame):
    wp = WordPunctTokenizer()
    size = df.shape[0]
    preprocessed = []
    
    for i in range(size):
        sentence = df.iloc[i]['text']
#         sentence_parts = sentence.split(' ')
#         sentence_parts = list(filter(lambda x: '@' not in x and '#' not in x, sentence_parts))
#         sentence = ' '.join(sentence_parts)
        tokenized = wp.tokenize(sentence)
        tokenized = list(filter(lambda x: 
                                len(x) > 2 and 
                                x not in common_words and
                                re.search('\d+', x) is None, tokenized))
        new_sentence = ' '.join(tokenized)
        preprocessed.append(new_sentence)
    
    return preprocessed

def predict(df_train: pd.DataFrame, df_test: pd.DataFrame):
    predictions = df_train[:]['airline_sentiment']
    positive_train = df_train[df_train['airline_sentiment'] == 'positive']
    negative_train = df_train[df_train['airline_sentiment'] == 'negative']
    
    positive_indices = positive_train.index.tolist()
    negative_indices = negative_train.index.tolist()
    
    positive_len = len(positive_indices)
    negative_len = len(negative_indices)
    whole_len = positive_len + negative_len
        
    preprocessed_train = preprocess(df_train)    
    vectorizer = CountVectorizer()
    
    X = vectorizer.fit_transform(preprocessed_train)
    frequencies = np.array(X.toarray())
    
#     preprocessed_test = preprocess(df_test)
#     X_test = vectorizer.fit_transform(preprocessed_test)
#     test_frequencies = np.array(X_test.toarray())
    
#     clf = MultinomialNB()
#     clf.fit(frequencies, predictions)
    
#     test_predict = clf.predict(test_frequencies)
#     print(test_predict)
    
    all_words = vectorizer.get_feature_names()
    all_words_dict = dict((all_words[i], i) for i in range(len(all_words)))

    positive_frequencies = frequencies[positive_indices]
    negative_frequencies = frequencies[negative_indices]
    
    word_counts = frequencies.sum(axis=0)
    positive_word_counts = positive_frequencies.sum(axis=0)
    negative_word_counts = negative_frequencies.sum(axis=0)
    
    positive_word_frequencies = (positive_word_counts / word_counts) / positive_len * whole_len
    negative_word_frequencies = (negative_word_counts / word_counts) / negative_len * whole_len
    
    preprocessed_test = preprocess(df_test)
    predictions = []
    
    eps = 0.001
    logged_eps = math.log(eps)
    wp = WordPunctTokenizer()
    
    for i in range(len(preprocessed_test)):
        sentence = preprocessed_test[i]
        sentence_words = wp.tokenize(sentence)
        positive_sum = 0
        negative_sum = 0
        for word in sentence_words:
            if word in all_words_dict.keys():
                word_index = all_words_dict[word]
                positive_sum += math.log(positive_word_frequencies[word_index] + eps)
                negative_sum += math.log(negative_word_frequencies[word_index] + eps)
            else:
                positive_sum += logged_eps
                negative_sum += logged_eps
        if positive_sum >= negative_sum:
            predictions.append('positive')
        else:
            predictions.append('negative')   
    return predictions

train = pd.read_csv('./tweets_train.csv')
test = pd.read_csv('./tweets_test.csv')
predictions = predict(train, test)
test_sentiments = test[:]['airline_sentiment'].tolist()
true = 0
i = 0
for x, y in zip(predictions, test_sentiments):
    if x == y: 
        true += 1
#     else:
#         print(i)
#         print('true ', y)
#         print(test.loc[i]['text'])
    i += 1
print(true / len(predictions))
# print(predictions)

ValueError: matmul: Input operand 1 has a mismatch in its core dimension 0, with gufunc signature (n?,k),(k,m?)->(n?,m?) (size 8986 is different from 4312)