In [8]:
import pandas as pd
import gower
import copy
from scipy.cluster.hierarchy import dendrogram, linkage, fcluster
import matplotlib.pyplot as plt
from sklearn.metrics import silhouette_score, davies_bouldin_score, calinski_harabasz_score
import numpy as np
from scipy.spatial.distance import squareform
from sklearn.preprocessing import OneHotEncoder

# Data

The first step is to get your data either extract them from the KG or from your project. 

The function of KG serves: if we have some inspection tasks from the project but we don't know the detailed information about them, we can extract the features of the tasks from the KG using Cypher statements and do the work packaging.

In [9]:
df = pd.read_excel('yourdata.xlsx', sheet_name=0, usecols=[1, 3, 5, 7, 9, 11, 12], dtype={'Frequency':str}, keep_default_na=False, na_values=[''])
df.fillna('blank', inplace=True)  # Replace the blank spaces with 'blank'
querydata = df.to_dict('records')
print('len(querydata)={0}'.format(len(querydata)))

seen = set()
unique_querydata = []
for d in querydata:
    t = tuple(sorted(d.items()))
    if t not in seen:
        seen.add(t)
        unique_querydata.append(d)

print(unique_querydata)
print('len(unique_querydata)={0}'.format(len(unique_querydata)))

len(querydata)=605
[{'Inspection Task': 'rebar sampling for test', 'Targeted/related Product': 'rebar/reinforcement bar', 'Corresponding Production Task': 'steel material on factory', 'Frequency': 'as per bd requiement', 'Tool': 'digital vernier scale', 'Person': 'blank', 'Production Phase': 'structure'}, {'Inspection Task': 'corner cast sampling for test', 'Targeted/related Product': 'coner casting', 'Corresponding Production Task': 'steel material on factory', 'Frequency': 'as per bd requiement', 'Tool': 'digital vernier scale', 'Person': 'blank', 'Production Phase': 'structure'}, {'Inspection Task': 'material grade check for board, profile', 'Targeted/related Product': 'promatect hboard', 'Corresponding Production Task': 'board, profile pretreatment and punching', 'Frequency': 'routine', 'Tool': 'mill certificate', 'Person': 'blank', 'Production Phase': 'structure'}, {'Inspection Task': 'material grade check for board, profile', 'Targeted/related Product': 'knauf frp board', 'Corres

In [None]:
data = pd.DataFrame(unique_querydata)
groups = data.groupby('Production Phase')
for phase, group in groups:
    indexed_list = [(index, element) for index, element in enumerate(group['Inspection Task'].values)]
    print('group_len:{0}, group_data:{1}'.format(len(indexed_list), indexed_list))

In [11]:
def fitness(x, group_encoded):

    fit = np.array([0.]*np.size(x, 0)).reshape(-1, 1)
    silh = np.array([0.]*np.size(x, 0)).reshape(-1, 1)
    db = np.array([0.]*np.size(x, 0)).reshape(-1, 1)
    ch = np.array([0.]*np.size(x, 0)).reshape(-1, 1)
    dv = []
    
    for particle in range(np.size(x, 0)):

        weights = x[particle, :6]
        expanded_weights = []
        for weight, count in zip(weights, [264, 143, 138, 10, 33, 5]):
            expanded_weights.extend([weight] * count)
        group_encoded_weighted = group_encoded * np.array(expanded_weights)
        num_clusters = int(x[particle, -1])

        distance_matrix = gower.gower_matrix(group[['Inspection Task', 'Targeted/related Product', 
                                            'Corresponding Production Task', 'Frequency', 
                                            'Tool', 'Person']], weight=weights)
        distance_vector = squareform(distance_matrix)
                
        linked = linkage(distance_vector, method='average')
        labels = fcluster(linked, num_clusters, criterion='maxclust')
                    
        
        if len(np.unique(labels)) > 1:
            silhouette = silhouette_score(distance_matrix, labels, metric='precomputed')
            db_index = davies_bouldin_score(group_encoded_weighted, labels)
            ch_index = calinski_harabasz_score(group_encoded_weighted, labels)

            fit[particle, 0] = (silhouette + 1) / 2 + 1 / (1 + db_index) + 2 / (1 + np.exp(-0.1 * ch_index)) - 1
            silh[particle, 0] = silhouette
            db[particle, 0] = db_index
            ch[particle, 0] = ch_index
            dv.append(distance_vector)
            
    return fit, silh, db, ch, dv  

In [12]:
class GPSO:
    def __init__(self, N, D, w, c1, c2, Xmin, Xmax, Vmax):
        '''
        N is the populaiton size
        D is the dimension of searching space of the problem
        w is the Inertia Weight
        c1 and c2 are acceleration coefficients
        Xmin and Xmax is the boundary of the searching space(1*D dimension)
        Vmax:Particles' velocities on each dimension are clamped to a maximum velocity Vmax(1*D dimension)
        '''
        self.N, self.D, self.w, self.c1, self.c2, self.Xmin, self.Xmax, self.Vmax = N, D, w, c1, c2, Xmin, Xmax, Vmax

    def strict(self, X):
        weights = X[:, :-1]
        max_weights, min_weights = np.max(weights, axis=1), np.min(weights, axis=1)
        reinitial_index = np.argwhere((max_weights / (min_weights + 1e-5)) > 4)
        while len(reinitial_index) > 0:
            for j in range(self.D - 1):
                weights[reinitial_index, j] = np.random.uniform(self.Xmin[0][j], self.Xmax[0][j], (len(reinitial_index), 1))
            max_weights, min_weights = np.max(weights, axis=1), np.min(weights, axis=1)
            reinitial_index = np.argwhere((max_weights / (min_weights + 1e-5)) > 4)
        return X

    def search(self, group_encoded, iterations=1000):
        V = np.zeros((self.N, self.D))
        X = np.zeros((self.N, self.D))
        for j in range(self.D):# Initialize velocity and position for all particles
            V[:,j] = np.random.uniform(-self.Vmax[0][j], self.Vmax[0][j], (self.N,))
            X[:,j] = np.random.uniform(self.Xmin[0][j], self.Xmax[0][j], (self.N,))

        X[:, self.D-1] = np.round(X[:, self.D-1])  

        fit, silh, db, ch, dv = fitness(X, group_encoded)
        pBest = copy.copy(X)  
        best = np.min(fit)
        best_index = np.argmin(fit)
        silh_best, db_best, ch_best = copy.copy(silh[best_index, :]), copy.copy(db[best_index, :]), copy.copy(ch[best_index, :])
        dv_best = copy.copy(dv[best_index])
        gBest = copy.copy(np.reshape(X[best_index, :], (1, self.D)))  

        rand1 = np.random.rand(1, self.D)
        rand2 = np.random.rand(1, self.D)
        iteration = 0
        fail = 0
        collect = {'best':[], 'silh_best':[], 'db_best':[], 'ch_best':[]}
        while iteration < iterations:
            iteration += 1
            collect['best'].append(best)
            collect['silh_best'].append((silh_best + 1) / 2)
            collect['db_best'].append(1 / (db_best + 1))
            collect['ch_best'].append(2 / (1 + np.exp(-0.1 * ch_best)) - 1)
            V1 = self.w * V + self.c1 * rand1 * (pBest - X) + self.c2 * rand2 * (np.tile(gBest,(self.N,1)) - X)
            V1 = np.clip(V1, -self.Vmax, self.Vmax)  
            X1 = X + V1
            X1 = np.clip(X1, self.Xmin, self.Xmax)
            X1[:, self.D-1] = np.round(X1[:, self.D-1])

            X1 = self.strict(X1)
            
            fit1, silh, db, ch, dv = fitness(X1, group_encoded)
            for i in range(self.N):
                if fit1[i] > fit[i]:  
                    pBest[i,:] = copy.copy(X1[i,:])  
            best1 = np.max(fit1)
            best1_index = np.argmax(fit1)

            update = 0
            if best1 > best:
                gBest = copy.copy(np.reshape(X1[best1_index, :], (1, self.D)))  
                silh_best = copy.copy(silh[best1_index, :])
                db_best, ch_best = copy.copy(db[best1_index, :]), copy.copy(ch[best1_index, :])
                dv_best = copy.copy(dv[best1_index])
                best = copy.copy(best1)
                update = 1
            else:
                fail += 1
            if update == 1:
                fail = 0
            if fail > 30:  
                break
            else:
                V = copy.copy(V1)
                X = copy.copy(X1)
        return best, gBest, iteration, collect, silh_best, db_best, ch_best, dv_best

In [None]:
encoder = OneHotEncoder()
encoded_data = encoder.fit_transform(data[['Inspection Task', 'Targeted/related Product', 
                                           'Corresponding Production Task', 'Frequency', 
                                           'Tool', 'Person']]).toarray()

for phase, group in groups:
    group_encoded = encoder.transform(group[['Inspection Task', 'Targeted/related Product', 
                                            'Corresponding Production Task', 'Frequency', 
                                            'Tool', 'Person']]).toarray()  

    print('---------------')
    cycle = 10
    for times in range(cycle):
        
        gpso = GPSO(100, 7, 0.729, 1.49445, 1.49445, Xmin=np.append([0]*6,[2]).reshape(1, -1), 
                    Xmax=np.append([1]*6,[14]).reshape(1, -1), Vmax=np.append([0.1]*6,[1.5]).reshape(1, -1))
        
        best, gBest, iteration, collect, silh_best, db_best, ch_best, dv_best = gpso.search(group_encoded)
        optimal_clusters = int(gBest[0, 6])
        print("max_score={}; optimal_weights={};\noptimal_clusters={}; iterations={}; index={}".format(best, gBest[0, :6], optimal_clusters, iteration, [silh_best.item(), db_best.item(), ch_best.item()]))
    
    plt.figure()
    plt.plot(range(iteration), collect['best'], label='max_score', color='red')
    plt.plot(range(iteration), collect['silh_best'], label='silh_best', color='blue')
    plt.plot(range(iteration), collect['db_best'], label='db_best', color='green')
    plt.plot(range(iteration), collect['ch_best'], label='ch_best', color='yellow')
    plt.legend()
    plt.show()

    linked_best=linkage(dv_best,method='average')

    dendrogram(linked_best, no_labels=True, orientation='top', leaf_rotation=90., leaf_font_size=10., show_leaf_counts=True)
    plt.title(f'Dendrogram for {phase}')
    plt.xlabel('Inspection tasks')
    plt.ylabel('Distance')
    plt.show()
    
    dendrogram(linked_best, p=optimal_clusters,truncate_mode='lastp', orientation='top', leaf_rotation=90., leaf_font_size=10., show_leaf_counts=True)
    plt.title(f'Dendrogram for {phase}')
    plt.xlabel('Inspection tasks')
    plt.ylabel('Distance')
    plt.show()