In [10]:
## MATH AND DATA PROCESSING
import pandas as pd
import numpy as np
from scipy.special import gammaln
from math import lgamma, log
from sklearn.preprocessing import OrdinalEncoder

## PLOT
import matplotlib.pyplot as plt
import networkx as nx

## OS and sys
import sys
import os
import warnings
from itertools import product, chain

import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import train_test_split

In [11]:
from pgmpy.estimators import HillClimbSearch, BicScore, PC, ParameterEstimator
from pgmpy.estimators import MaximumLikelihoodEstimator as MLE

from pgmpy.models import BayesianNetwork

from pgmpy.factors.discrete import TabularCPD
from pgmpy.global_vars import SHOW_PROGRESS

from tqdm.auto import tqdm
from pgmpy.base import DAG

In [12]:
%run ML_score.py

In [28]:
x_test = pd.read_csv('./data/x_train.csv')

In [14]:
class PreprocessDataParameter:
    def __init__(self, NODE_SIZE, NUM_BINS, NUM_BINS_SMALL):
        self.NODE_SIZE = NODE_SIZE
        self.NUM_BINS = NUM_BINS
        self.NUM_BINS_SMALL = NUM_BINS_SMALL
        
    def plot(self, model):
        plt.figure(figsize=(8, 6), dpi=100)  
        
        nx.draw(model, with_labels=True, node_size = self.node_sizes)
        
        plt.axis('off')
        axis = plt.gca()
        axis.set_xlim([1.2*x for x in axis.get_xlim()])
        axis.set_ylim([1.2*y for y in axis.get_ylim()])
        
        return plt.show()

    def get_disc_data(self, disc_data):

        teplota = self.cutting_cond(disc_data['Teplota vzduchu'])
        rychlost = self.cutting_cond(disc_data['Rychlost otáček'])
        krout = self.cutting_cond(disc_data['Kroutící moment'])
        opo = self.cutting_cond(disc_data['Opotřebení nástroje'])
        
        return disc_data, teplota, rychlost, krout, opo
    
    def cutting_cond(self, x):
        return (min(x), max(x), (max(x) - min(x)) / self.NUM_BINS)
    
    def cutting_cond_small(self, x):
        return (min(x), max(x) + 20, ((max(x) + 20) - (min(x))) / self.NUM_BINS_SMALL)
    
    def bin_data(self, x_test, data_to_be_replaced = None):
        NUM_BINS = self.NUM_BINS
        NUM_BINS_SMALL = self.NUM_BINS_SMALL
        
        
        disc_data = x_test.copy()
        disc_data, teplota, rychlost, krout, opo = self.get_disc_data(disc_data)
        
        teplota = self.cutting_cond(disc_data['Teplota vzduchu'])
        rychlost = self.cutting_cond(disc_data['Rychlost otáček'])
        krout = self.cutting_cond(disc_data['Kroutící moment'])
        opo = self.cutting_cond_small(disc_data['Opotřebení nástroje'])
        
        disc_data['T'] = pd.cut(x = disc_data['Teplota vzduchu'], bins = np.arange(teplota[0], teplota[1], teplota[2]), labels = np.arange(0, NUM_BINS - 1, 1))
        disc_data['R'] = pd.cut(x = disc_data['Rychlost otáček'], bins = np.arange(rychlost[0], rychlost[1], rychlost[2]), labels = np.arange(0, NUM_BINS - 1, 1))
        disc_data['K'] = pd.cut(x = disc_data['Kroutící moment'], bins = np.arange(krout[0], krout[1], krout[2]), labels = np.arange(0, NUM_BINS - 1, 1))
        disc_data['O'] = pd.cut(x = disc_data['Opotřebení nástroje'], bins = np.arange(opo[0], opo[1], opo[2]), labels = np.arange(0, NUM_BINS_SMALL - 1, 1))

        y_test = disc_data['Porucha']
        disc_data = disc_data.dropna(how = 'any')
        
        return disc_data

In [15]:



class ClassificationMetrics:
    
    def __init__(self, real, predicted, target_variable):
        self.real = real
        self.predicted = predicted
        self.target_variable = target_variable
        
    def _classify_points(self):
        
        true_ones = 0
        true_zeros = 0
        false_ones = 0
        false_zeros = 0
        good = 0
        err = 0
        
        for i, ii in list(zip(self.real, self.predicted)):
            if ii != i:
                err += 1
                if ii == 1:
                    false_ones += 1
                else:
                    false_zeros += 1
            else:
                good += 1
                if i == 1:
                    true_ones += 1
                else:
                    true_zeros += 1
                    
        self.err = err
        self.good = good
        self.false_ones = false_ones
        self.false_zeros = false_zeros
        self.true_ones = true_ones
        self.true_zeros = true_zeros
        
    def _get_overall_prob(self):
        self._classify_points()
        
        return self.good / (self.good + self.err)
    
    def _return_false_good_vals(self):
        self._classify_points()
        
        return self.good, self.err, self.false_ones, self.false_zeros, self.true_ones, self.true_zeros
    
    def _plot_conf_matrix(self):
        import seaborn as sns
        data = np.matrix([[self.true_zeros, self.false_ones], [self.false_zeros, self.true_ones]])
        sns.heatmap(data, annot=True,  linewidths=.5,cmap='Blues', fmt='g')


In [29]:
def print_full(cpd):
    backup = TabularCPD._truncate_strtable
    TabularCPD._truncate_strtable = lambda self, x: x
    print(cpd)
    TabularCPD._truncate_strtable = backup
    

preprocess_pearson = PreprocessDataParameter(
    NODE_SIZE = 5000,
    NUM_BINS = 12,
    NUM_BINS_SMALL = 12
)

preprocess_hc = PreprocessDataParameter(
    NODE_SIZE = 5000,
    NUM_BINS = 6,
    NUM_BINS_SMALL = 6
)


pearson_data = preprocess_pearson.bin_data(x_test = x_test)
hc_data = preprocess_hc.bin_data(x_test = x_test)

pearson_test = preprocess_pearson.bin_data(x_test = pd.read_csv("./data/x_test_miss.csv"))
hc_test = preprocess_hc.bin_data(x_test = pd.read_csv("./data/x_test_miss.csv"))

## Structure Learning

In [None]:


gs = HillClimbSearch(hc_data[['Teplota vzduchu', 'Rychlost otáček', 'Kroutící moment', 'Opotřebení nástroje', 'Porucha']])
ml_model = gs.estimate(scoring_method = MLScore(hc_data[['Teplota vzduchu', 'Rychlost otáček', 'Kroutící moment', 'Opotřebení nástroje', 'Porucha']]), max_iter = 150, show_progress = True)


ml_model.edges()


In [None]:
black_list = [("Porucha", "O"), ("Porucha", "R"), ("Porucha", "T"), ("Porucha", "K")]
fixed_edges = [('R', 'K')]



gs = HillClimbSearch(hc_data[['Teplota vzduchu', 'Rychlost otáček', 'Kroutící moment', 'Opotřebení nástroje', 'Porucha']])
ml_model = gs.estimate(scoring_method = MLScore(hc_data[['Teplota vzduchu', 'Rychlost otáček', 'Kroutící moment', 'Opotřebení nástroje', 'Porucha']]), max_iter = 150, show_progress = True, black_list=black_list, fixed_edges=fixed_edges)


ml_model.edges()


In [None]:
pc = PC(data=pearson_data[['Teplota vzduchu', 'Rychlost otáček', 'Kroutící moment', 'Opotřebení nástroje', 'Porucha']])
pc_model_peaerson = pc.estimate(ci_test = 'pearsonr', significance_level=0.05)

pc_model_peaerson.edges()

In [None]:
model = BayesianNetwork([('O', 'Porucha')])
estimator = MLE(BayesianNetwork(model), hc_data[['K', 'T', 'R', 'O', 'Porucha']])


print(MLE(model, hc_data[['K', 'T', 'R', 'O', 'Porucha']]).estimate_cpd('O'))
print(MLE(model, hc_data[['K', 'T', 'R', 'O', 'Porucha']]).estimate_cpd('Porucha'))

In [None]:

model = BayesianNetwork([('K', 'Porucha'), ('O', 'Porucha'), ('R', 'K')])

estimator = MLE(BayesianNetwork(model), hc_data[['K', 'T', 'R', 'O', 'Porucha']])


print(MLE(model, hc_data[['K', 'T', 'R', 'O', 'Porucha']]).estimate_cpd('R'))
print(MLE(model, hc_data[['K', 'T', 'R', 'O', 'Porucha']]).estimate_cpd('K'))
print(MLE(model, hc_data[['K', 'T', 'R', 'O', 'Porucha']]).estimate_cpd('O'))
print(MLE(model, hc_data[['K', 'T', 'R', 'O', 'Porucha']]).estimate_cpd('Porucha'))

In [30]:
m = BayesianNetwork([("O", "Porucha")])

m.fit(pearson_data[['O', 'Porucha']])
lst = m.predict(pearson_test[['O']])

metrics = ClassificationMetrics(
    real = pearson_test['Porucha'].values,
    predicted = lst['Porucha'].values,
    target_variable="Porucha"
)

metrics._get_overall_prob()


  0%|          | 0/11 [00:00<?, ?it/s]

0.9738327347357619

In [31]:
m = BayesianNetwork([('K', 'Porucha'), ('O', 'Porucha'), ('R', 'K')])

m.fit(hc_data[['O', 'Porucha', 'K', 'R']])
lst = m.predict(hc_test[['O', 'K', 'R']])

metrics = ClassificationMetrics(
    real = hc_test['Porucha'].values,
    predicted = lst['Porucha'].values,
    target_variable="Porucha"
)

metrics._get_overall_prob()

  0%|          | 0/54 [00:00<?, ?it/s]

0.9803481190342505