In [7]:
!pip install wittgenstein

Collecting wittgenstein
[?25l  Downloading https://files.pythonhosted.org/packages/9b/57/c81aa56ee379dac76e7fef745dcbd3b3c692df5699c593078c6fdd71a83f/wittgenstein-0.2.3-py3-none-any.whl (77kB)
[K     |████▏                           | 10kB 16.9MB/s eta 0:00:01[K     |████████▍                       | 20kB 23.1MB/s eta 0:00:01[K     |████████████▋                   | 30kB 27.2MB/s eta 0:00:01[K     |████████████████▉               | 40kB 20.0MB/s eta 0:00:01[K     |█████████████████████           | 51kB 17.0MB/s eta 0:00:01[K     |█████████████████████████▎      | 61kB 17.4MB/s eta 0:00:01[K     |█████████████████████████████▌  | 71kB 12.3MB/s eta 0:00:01[K     |████████████████████████████████| 81kB 6.8MB/s 
Installing collected packages: wittgenstein
Successfully installed wittgenstein-0.2.3


In [8]:
%matplotlib inline
import math
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import GridSearchCV, StratifiedKFold
from sklearn.naive_bayes import GaussianNB, ComplementNB
from sklearn.utils import class_weight
import wittgenstein as lw
import tensorflow as tf
from imblearn.over_sampling import RandomOverSampler
from sklearn.metrics import classification_report,f1_score,confusion_matrix
import copy
import json
import warnings
import seaborn as sn
warnings.filterwarnings('ignore')



In [9]:
customers = pd.read_csv('/content/drive/MyDrive/DM/Classification/normalized_data_classification_training_tot.csv', sep=',', index_col=0) 
customers_test = pd.read_csv('/content/drive/MyDrive/DM/Classification/normalized_data_classification_test_tot.csv', sep=',', index_col=0) 

In [10]:
x_training = customers[["NumBaskets_mean", "DistinticProducts_mean", "Qta_mean", "Qta_entropy", "Sale_mean", "Sale_entropy"]].to_numpy()
y_training = customers["label"].to_numpy()

In [11]:
x_test = customers_test[["NumBaskets_mean", "DistinticProducts_mean", "Qta_mean", "Qta_entropy", "Sale_mean", "Sale_entropy"]].to_numpy()
y_test = customers_test["label"].to_numpy()

In [12]:
cv = StratifiedKFold(n_splits = 5, random_state = 1234, shuffle=True)
ros = RandomOverSampler(random_state=1234)

##GridSearch and Cross Validation setup

In [None]:
#In order to use IREP as algoithm
'''irep_clf = lw.IREP()
param_grid = {"prune_size": [0.5, 0.6], "k": [1, 3, 5]}
grid_search = GridSearchCV(estimator=irep_clf, param_grid=param_grid)
grid_search.fit(x_training, y_training)'''

'irep_clf = lw.IREP()\nparam_grid = {"prune_size": [0.5, 0.6], "k": [1, 3, 5]}\ngrid_search = GridSearchCV(estimator=irep_clf, param_grid=param_grid)\ngrid_search.fit(x_training, y_training)'

In [13]:
def model_prediction(models, x):
  indexes = []
  models = sorted(models, key=lambda tup: tup[2],reverse= False)
  print(models)
  for (model,label,performance) in models:
    result = model.predict(x)
    indexes.append((np.where(result),label))
  labels = np.full((len(x),),1)
  for (index,label) in indexes:
    labels[index] = label
  return labels


In [14]:
def get_model_score(model,x,y):
  labels = np.full((len(y),),1)
  labels[np.where(model.predict(x))] = 0
  performance = classification_report(y, labels ,target_names=['0', '1'],output_dict = True)['macro avg']['f1-score']
  return performance

In [None]:
def my_cv(k,prune_size,x_training,y_training,x_test,y_test):
  #In order to use IREP algoithm use the line below
  #def my_cv(prune_size,x_training,y_training,x_test,y_test):
  train_scores = []
  validation_scores = []
  test_scores = []
  fold = 0
  for train_ind, val_ind in cv.split(x_training,y_training):
    training_set_x = x_training[train_ind]
    training_set_y = y_training[train_ind]
    validation_set_x = x_training[val_ind]
    validation_set_y = y_training[val_ind]
    models = []
    fold += 1
    for positive_label in np.unique(y_training):
      y_to_fit = training_set_y.copy()
      y_to_fit[np.where(training_set_y == positive_label)] = 0
      y_to_fit[np.where(training_set_y != positive_label)] = 1
      x_to_fit, y_to_fit = ros.fit_resample(training_set_x, y_to_fit)
      
      ripper = lw.RIPPER(k = k, prune_size = prune_size)
      #In order to use IREP algoithm use the line below
      #ripper = lw.IREP(prune_size = prune_size)
      ripper.fit(x_to_fit, y = y_to_fit,pos_class = 0)

      performance = get_model_score(ripper,x_to_fit,y_to_fit)

      models.append((ripper,positive_label,performance))
    training_labels = model_prediction(models,training_set_x)
    train_scores.append(f1_score(training_set_y,training_labels,average='weighted'))

    validation_labels = model_prediction(models,validation_set_x)
    validation_scores.append(f1_score(validation_set_y,validation_labels,average='weighted'))

    test_labels = model_prediction(models,x_test)
    test_scores.append(f1_score(y_test,test_labels,average='weighted'))

  return train_scores,validation_scores,test_scores

In [None]:
results = {}

prune_sizes = [0.3,0.5] #[0.3,0.5,0.6]
k_list = [1,5] #[1,3,5,7]
for prune_size in prune_sizes:
  for k in k_list:
    name = str(k) + '_' + str(prune_size)
    #In order to use IREP algoithm use the line below
    #name = str(prune_size)
    train,validation,test = my_cv(k,prune_size,x_training,y_training,x_test,y_test)
    #In order to use IREP algoithm use the line below
    #train,validation,test = my_cv(prune_size,x_training,y_training,x_test,y_test)
    result = {}
    
    result['training_avg'] = np.mean(train)
    result['validation_avg'] = np.mean(validation)
    result['test_avg'] = np.mean(test)

    result['train_std'] = np.std(train)
    result['validation_std'] = np.std(validation)
    result['test_std'] = np.std(test)

    result['training_scores'] = train
    result['validation_scores'] = validation
    result['test_scores'] = test

    results[name] = result


##Result evaluation

In [15]:
def plot_confusion_matrix(y_true,y_pred,filename):
  matrix = confusion_matrix(y_true,y_pred,labels = [0,1,2])
  names = ['Low Spend','Medium Spend', 'High Spend']
  row,column = matrix.shape
  values = []
  for i in range(row):
    single_row_values = []
    for j in range(column):
      single_row_values.append(matrix[i,j]/len(y_true))
    values.append(single_row_values)
  df_cm = pd.DataFrame(matrix, index = names, columns = names)
  plt.figure(figsize = (10,7))
  ax = sn.heatmap(df_cm, annot=values,fmt='.4f',cmap='viridis',cbar = False,square = True)
  ax.set_xlabel('Predicted')
  ax.set_ylabel('True')
  ax.xaxis.set_label_position('top') 
  plt.savefig(filename + '.png')

In [16]:
def build_model(x,y,test_x,test_y,k,prune_size):
  models = []
  for positive_label in np.unique(y):
    y_to_fit = y.copy()
    y_to_fit[np.where(y == positive_label)] = 0
    y_to_fit[np.where(y != positive_label)] = 1
    x_to_fit, y_to_fit = ros.fit_resample(x, y_to_fit)
    
    ripper = lw.RIPPER(k = k, prune_size = prune_size)
    ripper.fit(x_to_fit, y = y_to_fit,pos_class = 0)
    performance = get_model_score(ripper,x_to_fit,y_to_fit)
    models.append((ripper,positive_label,performance))
  training_labels = model_prediction(models,x)
  print(classification_report(y_training, training_labels))
  print((f1_score(y,training_labels,average='weighted')))
  #plot_confusion_matrix(training_labels,y,'/content/drive/MyDrive/DM/Classification/Rule_based/Confusion_matricx_train') #'/content/drive/MyDrive/DM/Classification/Rule_based/Confusion_matricx_train')


  plt.show()

  test_labels = model_prediction(models,test_x)
  print(classification_report(test_y, test_labels))
  print(f1_score(test_y,test_labels,average='weighted'))
  #plot_confusion_matrix(test_labels,test_y,'/content/drive/MyDrive/DM/Classification/Rule_based/Confusion_matricx_test') #'/content/drive/MyDrive/DM/Classification/Rule_based/Confusion_matricx_test')
  plt.show()

In [None]:
build_model(x_training,y_training,x_test,y_test,5,0.5)