## KNN

In [104]:
cats_instances = 50
dogs_instances = 30
cows_instances = 20
total_instances = cats_instances + dogs_instances + cows_instances

# k=1
acc1 = 100.0

# k=total_instances
acck = (cats_instances / total_instances) * 100

print("Accuracy for k=1:", acc1)
print("Accuracy for k=total_instances:", acck)

Accuracy for k=1: 100.0
Accuracy for k=total_instances: 50.0


## Navie Bayes

In [129]:
import numpy as np
from sklearn.metrics import precision_score, accuracy_score, recall_score, f1_score

In [130]:
import numpy as np
from scipy.stats import norm

In [131]:
class GaussianNaiveBayes:
    def __init__(self):
        self.priors = None
        self.means = None
        self.stds = None

    def fit(self, X, y):
        n_samples, n_features = X.shape
        self.classes = np.unique(y)
        n_classes = len(self.classes)

        # Calculate priors
        self.priors = np.zeros(n_classes)
        for idx, c in enumerate(self.classes):
            self.priors[idx] = np.sum(y == c) / n_samples
            print(f"Class {c}'s Prior(P) is {self.priors[idx]:.3f}")

        # Calculate means and standard deviations for each feature in each class
        self.means = np.zeros((n_classes, n_features))
        self.stds = np.zeros((n_classes, n_features))
        for idx, c in enumerate(self.classes):
            X_c = X[y == c]
            self.means[idx, :] = np.mean(X_c, axis=0)
            self.stds[idx, :] = np.std(X_c, axis=0)
        for idx, c in enumerate(self.classes):
            for idx_f in range(n_features):
                print(f"Class {c}'s Feature {idx_f} Mean is {self.means[idx,idx_f]:.3f}")

    def predict(self, X):
        y_pred = np.array([self._predict(x) for x in X])
        print(f"{X}'s Predict Class is {y_pred}")
        return y_pred

    def _predict(self, x):
        posteriors = []

        # Calculate posterior probabilities for each class
        for idx, c in enumerate(self.classes):
            prior = np.log(self.priors[idx])
            likelihood = np.sum(np.log(norm.pdf(x, self.means[idx], self.stds[idx])))
            print(f"For {x},class {c}'s Likelihood is {likelihood:.3f}")
            posterior = prior + likelihood
            print(f"For {x},class {c}'s Posterior(L) is {posterior:.3f}")
            posteriors.append(posterior)

        # Select the class with the highest posterior probability
        return self.classes[np.argmax(posteriors)]

In [132]:
import pandas as pd

In [133]:
df = pd.read_csv("demo-bayes.csv",header=None)

In [134]:
X = df.iloc[:,:-1].to_numpy()
y = df.iloc[:,-1].to_numpy()

In [135]:
gnb = GaussianNaiveBayes()

In [136]:
X_train = X
y_train = y

In [137]:
gnb.fit(X_train, y_train)

Class 1's Prior(P) is 0.400
Class 2's Prior(P) is 0.333
Class 3's Prior(P) is 0.267
Class 1's Feature 0 Mean is 1.543
Class 1's Feature 1 Mean is 1.968
Class 2's Feature 0 Mean is 1.792
Class 2's Feature 1 Mean is 2.584
Class 3's Feature 0 Mean is 1.962
Class 3's Feature 1 Mean is 0.932


In [138]:
X_test = np.array([[2.0,2.1]])

In [141]:
y_pred = gnb.predict(X_test)

For [2.  2.1],class 1's Likelihood is -15.672
For [2.  2.1],class 1's Posterior(L) is -16.588
For [2.  2.1],class 2's Likelihood is -1.760
For [2.  2.1],class 2's Posterior(L) is -2.859
For [2.  2.1],class 3's Likelihood is -85.589
For [2.  2.1],class 3's Posterior(L) is -86.911
[[2.  2.1]]'s Predict Class is [2]


In [142]:
import numpy as np

In [111]:
df_cm = pd.read_csv("confusion_matrix.csv")

In [113]:
cm = df_cm.to_numpy()

In [114]:


# 提取混淆矩阵中的计数
TP, FN, FP, TN = cm.ravel()

# 计算四个度量
precision = TP / (TP + FP)
accuracy = (TP + TN) / (TP + TN + FP + FN)
recall = TP / (TP + FN)
f1 = 2 * ((precision * recall) / (precision + recall))
# 输出结果
print("TP: {:.3f}, TN: {:.3f}, FP: {:.3f}, FN: {:.3f}".format(TP, TN, FP, FN))
print("Precision: {:.3f}, Accuracy: {:.3f}, Recall: {:.3f}, F1: {:.3f}".format(precision, accuracy, recall, f1))

TP: 3.000, TN: 90.000, FP: 5.000, FN: 2.000
Precision: 0.375, Accuracy: 0.930, Recall: 0.600, F1: 0.462


## Confusion Matrix

In [116]:
import numpy as np
from sklearn.metrics import precision_score, accuracy_score, recall_score, f1_score
from sklearn.metrics import confusion_matrix

# 模型预测结果
y_true = [1, 0, 1, 1, 0, 1, 0, 1]
y_pred = [1, 1, 1, 0, 0, 1, 0, 1]

# 计算混淆矩阵
cm = confusion_matrix(y_true, y_pred)

# 提取混淆矩阵中的计数
TN, FP, FN, TP = cm.ravel()

# 计算四个度量
precision = TP / (TP + FP)
accuracy = (TP + TN) / (TP + TN + FP + FN)
recall = TP / (TP + FN)
f1 = 2 * ((precision * recall) / (precision + recall))

# 使用 sklearn 计算四个度量进行验证
precision_skl = precision_score(y_true, y_pred)
accuracy_skl = accuracy_score(y_true, y_pred)
recall_skl = recall_score(y_true, y_pred)
f1_skl = f1_score(y_true, y_pred)

# 输出结果
print("手动计算 Precision: {:.2f}, Accuracy: {:.2f}, Recall: {:.2f}, F1: {:.2f}".format(precision, accuracy, recall, f1))
print("sklearn计算 Precision: {:.2f}, Accuracy: {:.2f}, Recall: {:.2f}, F1: {:.2f}".format(precision_skl, accuracy_skl, recall_skl, f1_skl))

手动计算 Precision: 0.80, Accuracy: 0.75, Recall: 0.80, F1: 0.80
sklearn计算 Precision: 0.80, Accuracy: 0.75, Recall: 0.80, F1: 0.80


training a k-nn classifier just involves storing the data, whereas making a prediction involves calculating the distance from one point to all other points.

training a k-nn clasifier just involves storing the data, which means it takes the same time for any value of k. The value of k makes a difference on prediction time, however.

we should choose the hyperparameters based on perfomance on a validation set, not the held-out test set.

## linear regression

In [152]:
import pandas as pd
import numpy as np

In [153]:
df = pd.read_csv("line_regrassion.csv")

In [159]:
X = df.iloc[:,:-1].to_numpy().reshape(-1)
y = df.iloc[:,-1].to_numpy()


In [161]:
gradient, intercept = np.polyfit(X, y, 1)
print(f"The grad is {gradient:.3f} and the Inter is {intercept:.3f}")

The grad is -0.157 and the Inter is 63.072


## K-Means Cluster

In [1]:
import numpy as np
import pandas as pd
from collections import Counter

In [106]:
class KMeans:
    def __init__(self, n_clusters, max_iter=300, random_state=None,init_centroids=None):
        self._init_centroids = np.array(init_centroids)
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        self.random_state = random_state
        if self._init_centroids is not None:
            if len(self._init_centroids) != n_clusters:
                raise RuntimeError("Please check your input")

    def get_init_centroids(self, X):
        np.random.seed(self.random_state)
        random_idx = np.random.permutation(X.shape[0])
        centroids = X[random_idx[:self.n_clusters]]
        return centroids

    def _update_centroids(self, X, labels):
        centroids = np.zeros((self.n_clusters, X.shape[1]))
        for k in range(self.n_clusters):
            centroids[k, :] = X[labels == k].mean(axis=0)
        return centroids

    def _labels_closest_centroids(self, X, centroids):
        distances = np.zeros((X.shape[0], self.n_clusters))
        for k in range(self.n_clusters):
            distances[:, k] = np.linalg.norm(X - centroids[k], axis=1)
        labels = distances.argmin(axis=1)
        return labels

    def fit(self, X):
        if self._init_centroids is None:
            self.centroids_ = self.get_init_centroids(X)
        else:
            self.centroids_ = self._init_centroids
        for _ in range(self.max_iter):
            print(f"{_+1} iter:")
            prev_centroids = self.centroids_
            print(f"Before Centroids:{prev_centroids}")
            self.labels_ = self._labels_closest_centroids(X, prev_centroids)
            for k in range(self.n_clusters):
                print(f"Indices in label {k}: {np.where(self.labels_ == k)[0]}")
            self.centroids_ = self._update_centroids(X, self.labels_)
            print(f"After Centroids:{self.centroids_},Labels: {Counter(self.labels_)}")
            if np.allclose(self.centroids_, prev_centroids):
                break
        print(f"Finish trainning using {_} times iter")
        print(Counter(self.labels_))

    def predict(self, X):
        labels = self._labels_closest_centroids(X, self.centroids_)
        return labels

In [107]:
df = pd.read_csv("demo_cluster.csv",header=None)
X = df.to_numpy()

In [108]:
kmeans = KMeans(n_clusters=2,init_centroids=[[0,6],[4,2]])

In [109]:
kmeans.fit(X)

1 iter:
Before Centroids:[[0 6]
 [4 2]]
Indices in label 0: [1 2 4 5 7]
Indices in label 1: [0 3 6]
After Centroids:[[2.         4.4       ]
 [2.33333333 3.33333333]],Labels: Counter({0: 5, 1: 3})
2 iter:
Before Centroids:[[2.         4.4       ]
 [2.33333333 3.33333333]]
Indices in label 0: [2 4 5 6 7]
Indices in label 1: [0 1 3]
After Centroids:[[2.6        4.8       ]
 [1.33333333 2.66666667]],Labels: Counter({0: 5, 1: 3})
3 iter:
Before Centroids:[[2.6        4.8       ]
 [1.33333333 2.66666667]]
Indices in label 0: [4 5 6 7]
Indices in label 1: [0 1 2 3]
After Centroids:[[3.   5.  ]
 [1.25 3.  ]],Labels: Counter({1: 4, 0: 4})
4 iter:
Before Centroids:[[3.   5.  ]
 [1.25 3.  ]]
Indices in label 0: [4 6 7]
Indices in label 1: [0 1 2 3 5]
After Centroids:[[3.33333333 5.33333333]
 [1.4        3.2       ]],Labels: Counter({1: 5, 0: 3})
5 iter:
Before Centroids:[[3.33333333 5.33333333]
 [1.4        3.2       ]]
Indices in label 0: [4 6 7]
Indices in label 1: [0 1 2 3 5]
After Centroids:

## Decision Tree

In [89]:
import graphviz
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.tree import DecisionTreeClassifier, export_graphviz
from sklearn.preprocessing import LabelEncoder

In [111]:
def get_entropy(y):
    """
    计算熵
    """
    n_samples = len(y)
    _, counts = np.unique(y, return_counts=True)
    probabilities = counts / n_samples
    entropy = -np.sum(probabilities * np.log2(probabilities))
    return entropy

In [119]:
def get_best_feature_name(clf, feature_names):
    """
    获取最佳分裂特征
    """
    importances = clf.feature_importances_
    # 返回最重要特征的名称
    max_index = importances.argmax()
    return feature_names[max_index]

In [127]:
def get_best_split_feature_name(clf, feature_names):
    best_feature_index = clf.tree_.feature[0]
    return feature_names[best_feature_index]

In [120]:
df = pd.read_csv("demo-clf.csv")
df_new = pd.DataFrame()

In [121]:
for d in df:
    le = LabelEncoder()
    df_new[d] = le.fit_transform(df[d])
df = df_new

In [122]:
X = df.iloc[:,:-1]
y = df.iloc[:,-1]

In [123]:
X

Unnamed: 0,Subject,Style,Colour
0,1,0,0
1,0,0,2
2,0,1,1
3,1,1,1
4,2,1,1
5,2,0,2
6,0,1,2
7,2,1,2


In [128]:
# Create a decision tree classifier and fit it to the dataset
print(f"root entropy is {get_entropy(y)}")
clf = DecisionTreeClassifier(criterion="entropy")
clf.fit(X.to_numpy(), y.to_numpy())
print(f"Most Important feature is {get_best_feature_name(clf,list(X.columns))}, and  the best feature to split on first is {get_best_split_feature_name(clf,list(X.columns))}")
print(f"Final depth is {clf.get_depth()-1}")

root entropy is 1.0
Most Important feature is Subject, and  the best feature to split on first is Colour
Final depth is 3


In [97]:
# Export the decision tree to a Graphviz format
dot_data = export_graphviz(clf, out_file=None, feature_names=X.columns, filled=True, rounded=True)

# Create a Graphviz object to visualize the decision tree
graph = graphviz.Source(dot_data)

# Display the decision tree
graph.view()

'Source.gv.pdf'

In [21]:
class DecisionTreeClassifier:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth

    def _best_split(self, X, y):
        """
        根据基尼系数计算最佳分割特征和阈值
        """
        best_gini = np.inf
        best_feature = None
        best_threshold = None
        n_features = X.shape[1]

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                mask = X[:, feature] <= threshold
                y_left, y_right = y[mask], y[~mask]
                gini = self._gini(y_left, y_right)
                if gini < best_gini:
                    best_gini = gini
                    best_feature = feature
                    best_threshold = threshold

        return best_feature, best_threshold

    def _gini(self, y_left, y_right):
        """
        计算基尼系数
        """
        n_left, n_right = len(y_left), len(y_right)
        n_total = n_left + n_right
        gini_left = 1 - np.sum((np.bincount(y_left) / n_left) ** 2)
        gini_right = 1 - np.sum((np.bincount(y_right) / n_right) ** 2)
        return (n_left * gini_left + n_right * gini_right) / n_total

    def _grow_tree(self, X, y, depth):
        """
        递归地生成决策树
        """
        n_samples, n_features = X.shape
        n_labels = len(np.unique(y))

        if n_labels == 1 or n_samples == 0 or depth == self.max_depth:
            return self._create_leaf(y)
        else:
            feature, threshold = self._best_split(X, y)
            mask = X[:, feature] <= threshold
            left = self._grow_tree(X[mask], y[mask], depth + 1)
            right = self._grow_tree(X[~mask], y[~mask], depth + 1)
            return self._create_node(feature, threshold, left, right)

    def _create_leaf(self, y):
        return {'label': np.argmax(np.bincount(y))}

    def _create_node(self, feature, threshold, left, right):
        return {'feature': feature, 'threshold': threshold, 'left': left, 'right': right}

    def fit(self, X, y):
        """
        拟合决策树模型
        """
        self.tree_ = self._grow_tree(X, y, depth=0)

    def _predict_sample(self, x, node):
        """
        预测单个样本
        """
        if 'label' in node:
            return node['label']
        else:
            feature, threshold = node['feature'], node['threshold']
            if x[feature] <= threshold:
                return self._predict_sample(x, node['left'])
            else:
                return self._predict_sample(x, node['right'])

    def predict(self, X):
        """
        预测整个数据集
        """
        return np.array([self._predict_sample(x, self.tree_) for x in X])

In [None]:
class DecisionTreeClassifier:
    def __init__(self, max_depth=None):
        self.max_depth = max_depth

    def _best_split(self, X, y):
        """
        根据信息增益计算最佳分割特征和阈值
        """
        best_gain = -np.inf
        best_feature = None
        best_threshold = None
        n_features = X.shape[1]

        for feature in range(n_features):
            thresholds = np.unique(X[:, feature])
            for threshold in thresholds:
                mask = X[:, feature] <= threshold
                y_left, y_right = y[mask], y[~mask]
                gain = self._information_gain(y, y_left, y_right)
                if gain > best_gain:
                    best_gain = gain
                    best_feature = feature
                    best_threshold = threshold

        return best_feature, best_threshold

    def _entropy(self, y):
        """
        计算熵
        """
        n_samples = len(y)
        _, counts = np.unique(y, return_counts=True)
        probabilities = counts / n_samples
        entropy = -np.sum(probabilities * np.log2(probabilities))
        return entropy

    def _information_gain(self, y, y_left, y_right):
        """
        计算信息增益
        """
        p_left = len(y_left) / len(y)
        p_right = len(y_right) / len(y)
        gain = self._entropy(y) - (p_left * self._entropy(y_left) + p_right * self._entropy(y_right))
        return gain

    def _grow_tree(self, X, y, depth):
        """
        递归地生成决策树
        """
        n_samples, n_features = X.shape
        n_labels = len(np.unique(y))

        if n_labels == 1 or n_samples == 0 or depth == self.max_depth:
            return self._create_leaf(y)
        else:
            feature, threshold = self._best_split(X, y)
            mask = X[:, feature] <= threshold
            left = self._grow_tree(X[mask], y[mask], depth + 1)
            right = self._grow_tree(X[~mask], y[~mask], depth + 1)
            return self._create_node(feature, threshold, left, right)

    def _create_leaf(self, y):
        return {'label': np.argmax(np.bincount(y))}

    def _create_node(self, feature, threshold, left, right):
        return {'feature': feature, 'threshold': threshold, 'left': left, 'right': right}

    def fit(self, X, y):
        """
        拟合决策树模型
        """
        self.tree_ = self._grow_tree(X, y, depth=0)

    def _predict_sample(self, x, node):
        """
        预测单个样本
        """
        if 'label' in node:
            return node['label']
        else:
            feature, threshold = node['feature'], node['threshold']
            if x[feature] <= threshold:
                return self._predict_sample(x, node['left'])
            else:
                return self._predict_sample(x, node['right'])

    def predict(self, X):
        """
        预测整个数据集
        """
        return np.array([self._predict_sample(x, self.tree_) for x in X])

## Uniform Cost Search

In [95]:
import heapq

In [96]:
class Node:
    def __init__(self, name, neighbors=None):
        self.name = name
        self.neighbors = neighbors if neighbors else {}

    def __lt__(self, other):
        return self.name < other.name

In [97]:
def reconstruct_path(came_from, current_node):
    path = [current_node.name]
    while current_node in came_from:
        current_node = came_from[current_node]
        path.append(current_node.name)
    return path[::-1]

In [98]:
def uniform_cost_search(start, goal):
    open_set = [(0, start)]  # Set the initial cost to 0
    heapq.heapify(open_set)
    came_from = {}
    g_cost = {start: 0}
    node_expansion_order = 0

    while open_set:
        _, current = heapq.heappop(open_set)
        node_expansion_order += 1
        print(f"Node expansion order: {node_expansion_order}\tNode: {current.name}")

        if current == goal:
            return reconstruct_path(came_from, current)

        for neighbor, cost in current.neighbors.items():
            tentative_g_cost = g_cost[current] + cost
            if tentative_g_cost < g_cost.get(neighbor, float('inf')):
                came_from[neighbor] = current
                g_cost[neighbor] = tentative_g_cost
                heapq.heappush(open_set, (tentative_g_cost, neighbor))  # f_cost = g_cost

    return None

In [99]:
H = Node("H")
A = Node("A")
B = Node("B")
C = Node("C")
D = Node("D")
E = Node("E")
F = Node("F")
G = Node("G")
Q = Node("Q")
U = Node("U")

H.neighbors = {A: 1.8, B: 1.8,C: 1.7}
A.neighbors = {H: 1.8, B: 1,Q: 0.2}
B.neighbors = {H: 1.8, A: 1}
C.neighbors = {H: 1.7, U: 1.5}
Q.neighbors = {A: 0.2, G:0.9}
G.neighbors = {U: 0.7, Q: 0.9}

path = uniform_cost_search(H, G)
print("Path from H to G:", " -> ".join(path))

Node expansion order: 1	Node: H
Node expansion order: 2	Node: C
Node expansion order: 3	Node: A
Node expansion order: 4	Node: B
Node expansion order: 5	Node: Q
Node expansion order: 6	Node: G
Path from H to G: H -> A -> Q -> G


## A Star Search

In [87]:
import heapq

In [88]:
class Node:
    def __init__(self, name, neighbors=None, h_cost=0):
        self.name = name
        self.neighbors = neighbors if neighbors else {}
        self.h_cost = h_cost

    def __lt__(self, other):
        return self.name < other.name

In [89]:
def reconstruct_path(came_from, current_node):
    path = [current_node.name]
    while current_node in came_from:
        current_node = came_from[current_node]
        path.append(current_node.name)
    return path[::-1]

In [92]:
def a_star_search(start, goal):
    open_set = [(start.h_cost, start)]
    heapq.heapify(open_set)
    came_from = {}
    g_cost = {start: 0}
    node_expansion_order = 0

    while open_set:
        _, current = heapq.heappop(open_set)
        node_expansion_order += 1
        print(f"Node expansion order: {node_expansion_order}\tNode: {current.name}")

        if current == goal:
            print(g_cost)
            return reconstruct_path(came_from, current)

        for neighbor, cost in current.neighbors.items():
            tentative_g_cost = g_cost[current] + cost
            if tentative_g_cost < g_cost.get(neighbor, float('inf')):
                came_from[neighbor] = current
                g_cost[neighbor] = tentative_g_cost
                f_cost = tentative_g_cost + neighbor.h_cost
                heapq.heappush(open_set, (f_cost, neighbor))

    return None


In [94]:
H = Node("H",h_cost=2)
A = Node("A",h_cost=1)
B = Node("B",h_cost=0.8)
C = Node("C",h_cost=1.5)
G = Node("G",h_cost=0)
Q = Node("Q",h_cost=0.75)
U = Node("U",h_cost=0.5)

H.neighbors = {A: 1.8, B: 1.8,C: 1.7}
A.neighbors = {H: 1.8, B: 1,Q: 0.2}
B.neighbors = {H: 1.8, A: 1}
C.neighbors = {H: 1.7, U: 1.5}
Q.neighbors = {A: 0.2, G:0.9}
G.neighbors = {U: 0.7, Q: 0.9}

path = a_star_search(H, G)
print("Path from H to G:", "".join(path))

Node expansion order: 1	Node: H
Node expansion order: 2	Node: B
Node expansion order: 3	Node: A
Node expansion order: 4	Node: Q
Node expansion order: 5	Node: G
{<__main__.Node object at 0x00000211B9EEE8F0>: 0, <__main__.Node object at 0x00000211B9ECE1A0>: 1.8, <__main__.Node object at 0x00000211B9ECE170>: 1.8, <__main__.Node object at 0x00000211B9EEF7F0>: 1.7, <__main__.Node object at 0x00000211B9EEF2E0>: 2.0, <__main__.Node object at 0x00000211B9EEE7D0>: 2.9}
Path from H to G: HAQG


## Beyans Network

In [77]:
import numpy as np
import pandas as pd
from pgmpy.models import BayesianNetwork
from pgmpy.factors.discrete import TabularCPD
from pgmpy.inference import VariableElimination

In [78]:
model = BayesianNetwork([('D', 'H'), ('D', 'C'), ('F', 'C')])

In [79]:
CPD_D = TabularCPD(variable='D', variable_card=2, values=[[0.7], [0.3]])
CPD_F = TabularCPD(variable='F', variable_card=2, values=[[0.6], [0.4]])
CPD_H_given_D = TabularCPD(variable='H', variable_card=2, values=[[0.9,0.5], [0.1, 0.5]],
                          evidence=['D'], evidence_card=[2])
CPD_C_given_FD = TabularCPD(variable='C', variable_card=2,
                            values=[[0.9, 0.1, 0.2, 0],
                                    [0.1, 0.9, 0.8, 1]],
                            evidence=['D', 'F'],
                            evidence_card=[2, 2])

In [80]:
model.add_cpds(CPD_D, CPD_F, CPD_H_given_D, CPD_C_given_FD)

In [81]:
model.check_model()

True

In [82]:
inference = VariableElimination(model)
result = inference.query(variables=['H'], joint=False)
prob_H_1 = result['H'].values[1]
print(f"P(H=1): {prob_H_1:.4f}")

P(H=1): 0.2200


In [84]:
# Perform inference on the Bayesian model
inference = VariableElimination(model)
# Query the probability of a forest fire given there is a campfire (P(F=1|C=1))
result = inference.query(variables=['D'], evidence={'H': 1}, joint=False)
prob_D_1_given_H_1 = result['D'].values[1]
print(f"P(D=1|H=1): {prob_D_1_given_H_1:.4f}")

P(D=1|H=1): 0.6818


In [85]:
# Perform inference on the Bayesian model
inference = VariableElimination(model)

# Query the probability of a forest fire given there is a campfire and you smell smoke (P(F=1|C=1, S=1))
result = inference.query(variables=['D'], evidence={'C': 1, 'F': 1}, joint=False)
prob_F_1_given_C_1_S_1 = result['D'].values[1]

print(f"P(F=1|C=1, S=1): {prob_F_1_given_C_1_S_1:.4f}")

P(F=1|C=1, S=1): 0.3226


## Grid World

In [28]:
rewards = [1, -1, 10]
utilities = rewards.copy()
discount_factor = 0.5
possible_actions = ["F", "B"]
best_actions = []

def action_expected_utility(action, current_position, current_utilities):
    forward_position = min(current_position + 1, len(current_utilities) - 1)
    backward_position = max(current_position - 1, 0)

    if action == "forward":
        return 0.9 * current_utilities[forward_position] + 0.1 * current_utilities[backward_position]
    else:
        return 0.1 * current_utilities[forward_position] + 0.9 * current_utilities[backward_position]

new_utilities = []

for position in range(len(utilities)):
    if position == 2: # Goal reached, game is finished
        new_utilities.append(utilities[position])
        best_actions.append("N/A")
    else:
        forward_utility = action_expected_utility("forward", position, utilities)
        backward_utility = action_expected_utility("backward", position, utilities)
        print(f"at position {position}, the forward utility is {forward_utility} and the backward utility is {backward_utility} ")
        
        # Select the best action based on the utilities
        if forward_utility > backward_utility:
            best_action = "F"
            print("Go forward")
        else:
            best_action = "B"
            print("Go Backward")

        new_utility = rewards[position] + discount_factor * max(forward_utility, backward_utility)
        new_utilities.append(new_utility)
        best_actions.append(best_action)

print(f"After one iteration, the utilities are: {new_utilities}")
print("The agent tries to take these actions:")
print(best_actions)

at position 0, the forward utility is -0.8 and the backward utility is 0.8 
Go Backward
at position 1, the forward utility is 9.1 and the backward utility is 1.9 
Go forward
After one iteration, the utilities are: [1.4, 3.55, 10]
The agent tries to take these actions:
['B', 'F', 'N/A']


After one iteration, the utilities are:
[0.8500000000000001, -0.95, 6.61]
[3.5, 6.76, 10]

The agent tries to take these actions:
['D', 'U', 'D']
['D', 'R', 'G']


In [48]:
row,col = (0,0)
nrows = 2
ncols = 2
neigh = {"U": [max(row - 1, 0), col],
             "D": [min(row + 1, nrows-1), col],
             "L": [row, max(col - 1, 0)],
             "R": [row, min(col + 1, ncols-1)],
            }

In [49]:
neigh

{'U': [0, 0], 'D': [1, 0], 'L': [0, 0], 'R': [0, 1]}

In [53]:
import numpy as np

# rewards and transition model
rewards = np.array([[-0.5, -0.5, -0.5], [2, -0.5, 10]])
transition_probs = {'up': 0.8, 'left': 0.1, 'right': 0.1}
gamma = 0.9

# initializing utilities
utilities = rewards.copy()

# value iteration for one round
for row in range(2):
    for col in range(3):
        if row == 1 and col == 2:
            continue  # goal state

        actions = []

        if row == 0:  # can only move up with 80% probability
            actions.append(transition_probs['up'] * (rewards[row + 1, col] + gamma * utilities[row + 1, col]))

        if col > 0:  # can move left
            actions.append(transition_probs['left'] * (rewards[row, col - 1] + gamma * utilities[row, col - 1]))

        if col < 2:  # can move right
            actions.append(transition_probs['right'] * (rewards[row, col + 1] + gamma * utilities[row, col + 1]))

        utilities[row, col] = rewards[row, col] + gamma * max(actions)

print(f'The utility of cell row 1, column 2 is {utilities[0, 1]:.2f}')
print(f'The utility of cell row 1, column 3 is {utilities[0, 2]:.2f}')
print(f'The utility of cell row 2, column 1 is {utilities[1, 0]:.2f}')
print(f'The utility of cell row 2, column 2 is {utilities[1, 1]:.2f}')

The utility of cell row 1, column 2 is -0.36
The utility of cell row 1, column 3 is 13.18
The utility of cell row 2, column 1 is 1.91
The utility of cell row 2, column 2 is 1.21


In [55]:
import numpy as np

reward_grid = np.array([[2,-0.5,10],
                        [-0.5, -0.5, -0.5]])

utility_grid = np.copy(reward_grid)

def expected_utility(current_utilities, action, discount=0.9, transition_probabilities=[0.8, 0.1, 0.1]):
    up, down, left, right = "up", "down", "left", "right"

    probs = np.array(transition_probabilities)
    utilities = np.zeros_like(current_utilities)

    if action == up:
        utilities[1:, :] += current_utilities[:-1, :] * (probs[0])
        utilities[0, :] += current_utilities[0, :] * (probs[0])
        utilities[:, :-1] += current_utilities[:, 1:] * (probs[1])
        utilities[:, 1:] += current_utilities[:, :-1] * (probs[2])

    elif action == down:
        utilities[:-1, :] += current_utilities[1:, :] * (probs[0])
        utilities[-1, :] += current_utilities[-1, :] * (probs[0])
        utilities[:, 1:] += current_utilities[:, :-1] * (probs[1])
        utilities[:, :-1] += current_utilities[:, 1:] * (probs[2])

    elif action == left:
        utilities[:, 1:] += current_utilities[:, :-1] * (probs[0])
        utilities[:, 0] += current_utilities[:, 0] * (probs[0])
        utilities[1:, :] += current_utilities[:-1, :] * (probs[1])
        utilities[:-1, :] += current_utilities[1:, :] * (probs[2])

    elif action == right:
        utilities[:, :-1] += current_utilities[:, 1:] * (probs[0])
        utilities[:, -1] += current_utilities[:, -1] * (probs[0])
        utilities[:-1, :] += current_utilities[1:, :] * (probs[1])
        utilities[1:, :] += current_utilities[:-1, :] * (probs[2])

    return utilities

actions = ["up", "down", "left", "right"]

for action in actions:
    expected_u = expected_utility(utility_grid, action)
    utility_grid = reward_grid + 0.9 * expected_u

def value_iteration(reward_grid):
    utility_grid = np.copy(reward_grid)
    threshold = 1e-6

    while True:
        prev_utility_grid = np.copy(utility_grid)
        action_values = []

        for action in actions:
            action_values.append(expected_utility(prev_utility_grid, action))

        utility_grid = reward_grid + 0.9 * np.max(action_values, axis=0)
        print(utility_grid)

        diff = np.max(np.abs(utility_grid - prev_utility_grid))
        if diff < threshold:
            break

    return utility_grid

utility_grid = value_iteration(reward_grid)

will_reach_goal = (utility_grid[1, 2] > utility_grid[1, 1])
print("T" if will_reach_goal else "F")

[[ 3.395  6.655 17.155]
 [ 0.895 -0.905  6.655]]
[[ 6.87215 11.77015 22.95055]
 [ 1.86295  4.9711  11.77015]]
[[10.6421735 16.471795  27.5837095]
 [ 4.895347   9.201487  16.471795 ]]
[[14.30027363 20.18840467 31.34273239]
 [ 7.99049875 13.28273518 20.18840467]]
[[17.25479625 23.26221349 34.38372374]
 [10.99164318 16.57175267 23.26221349]]
[[19.7380416  25.74773883 36.84988031]
 [13.41491104 19.33164081 25.74773883]]
[[21.74571395 27.77176149 38.84921032]
 [15.45123762 21.56301045 27.77176149]]
[[23.38627966 29.41210237 40.47088996]
 [17.09758499 23.3857382  29.41210237]]
[[24.71549635 30.74375721 41.78612999]
 [18.44283779 24.86258557 30.74375721]]
[[25.79536059 31.82364629 42.85295174]
 [19.62545628 26.06229874 31.82364629]]
[[26.67931639 32.69973214 43.71825342]
 [20.58643755 27.04344456 32.69973214]]
[[27.39658652 33.41105247 44.42011835]
 [21.37241856 27.83956241 33.41105247]]
[[27.97947545 33.98804583 44.98947994]
 [22.01017772 28.48647017 33.98804583]]
[[28.45230899 34.45620787 4

## CrossOVer

In [56]:
def single_point_crossover(a, b, point):
    return a[:point] + b[point:], b[:point] + a[point:]

def swap_mutation(individual, p1, p2):
    individual_list = list(individual)
    individual_list[p1], individual_list[p2] = individual_list[p2], individual_list[p1]
    return ''.join(individual_list)

def decode_binary(binary_str):
    return int(binary_str, 2)

In [57]:
A = '1101001'
B = '1010010'
crossover_point = 3
mutation_points = [(2, 4), (3, 6)]

A_prime, B_prime = single_point_crossover(A, B, crossover_point)
A_prime = swap_mutation(A_prime, *mutation_points[0])
B_prime = swap_mutation(B_prime, *mutation_points[1])

decimal_A_prime = decode_binary(A_prime)
decimal_B_prime = decode_binary(B_prime)

print("A' =", A_prime, ", Decimal value =", decimal_A_prime)
print("B' =", B_prime, ", Decimal value =", decimal_B_prime)

A' = 1100010 , Decimal value = 98
B' = 1011001 , Decimal value = 89
