In [1]:
import numpy as np
import matplotlib.pyplot as plt

In [2]:
# 将每一列标准化
from sklearn import preprocessing
def standardize(X, wholeImage=False):
    if wholeImage:
        scaled = np.zeros(X.shape, dtype=float)
        for i in range(X.shape[0]):
            for j in range(3):
                img = X[i, :, :, j]
                scaled[i, :, :, j] = preprocessing.scale(img)
        return scaled
    return preprocessing.scale(X)

In [3]:
from collections import OrderedDict
from os import listdir, path
from os.path import isfile, join
import matplotlib.image as mpimg
import pickle
from IPython.display import display
from skimage.color import rgb2gray
from scipy.ndimage import zoom
def get_data(img_folder, data_path, num_img_per_leaf, test_split, class_names, trailingName, color=True):
    
    # 文件是否已存在
    if path.exists(data_path):
        return pickle.load( open( data_path, "rb" ) )
        
    # 存储数据的map
    data = {} # 'train'/'test' : className : leafNumber : ID : 256x256x3 image
    data['train'] = {}
    data['test'] = {}
    
    # 得到各个分类的文件名
    for class_name in class_names:
        print('fetching files from', class_name)
        
        # 加入到map中
        data['train'][class_name] = {}
        data['test'][class_name] = {}

        temp_map = {}

        this_folder = img_folder + '/' + class_name

        # filename列表
        file_names = [f for f in listdir(this_folder) if isfile(join(this_folder, f))]
        
        # 拿到ID
        for file_name in file_names:
            if "seg" not in file_name:
                ID = int(file_name[file_name.find("_") + 1:(len(file_name) - len(trailingName))])
                temp_map[ID] = file_name 

        # 排序
        nLeaf = 0
        tempC = 0
        leaf_map = {}
        map_ID_fileName = OrderedDict()
        for ID in sorted (temp_map.keys()):
            map_ID_fileName[ID] = temp_map[ID]
            leaf_map[ID] = nLeaf
            tempC += 1
            if tempC - num_img_per_leaf == 0:
                tempC = 0
                nLeaf += 1

        num_leaves = int(test_split * len(file_names) / num_img_per_leaf)

        # 图像数
        num_images = num_img_per_leaf * num_leaves

        # 测试集
        test_set = []

        # 根据test_split分割出测试集
        image_counter = 0
        for ID in map_ID_fileName:
            test_set.append(ID)
            image_counter += 1
            if image_counter >= num_images:
                break

        for ID in map_ID_fileName:
            
            if color:
                img = mpimg.imread(this_folder + "/" +  map_ID_fileName[ID]) / 255.
            else:
                img = rgb2gray(mpimg.imread(this_folder + "/" +  map_ID_fileName[ID]) / 255.)
            
            leaf = leaf_map[ID]
            dataset = 'train'
            if ID in test_set:
                dataset = 'test'
            if leaf not in data[dataset][class_name]:
                data[dataset][class_name][leaf] = {}
            data[dataset][class_name][leaf][ID] = img

    pickle.dump(data, open(data_path, "wb" ) )
    return data

In [4]:
# 随机选取图片展示
from math import sqrt
from random import randrange
def viewRandomImages(data, nImages, color=True):
    # data[className][leafNumber][ID]
    plt.figure(figsize=(13,10))
    for i in range(nImages):
        plt.subplot(sqrt(nImages),sqrt(nImages),i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        rClass = [key for key in data.keys()][randrange(len(data.keys()))]
        rLeaf = [key for key in data[rClass].keys()][randrange(len(data[rClass].keys()))]
        rID = [key for key in data[rClass][rLeaf].keys()][randrange(len(data[rClass][rLeaf].keys()))]
        rImg = data[rClass][rLeaf][rID]
        if color:
            plt.imshow(rImg)
        else:
            plt.imshow(rImg, cmap=plt.cm.gray)
        plt.xlabel(rClass + '_' + str(rID))
    plt.show()

In [5]:
def gabor_filters(filter_path):
    
    if path.exists(filter_path):
        kernels = pickle.load( open( filter_path, "rb" ) )
    
    # 创建kernals
    else:
        kernels = []
        for theta in range(4):
            theta = theta / 4. * np.pi
            for offset in range(4):
                offset = offset / 4. * np.pi
                for sigma in (1, 3):
                    for freq in (0.05, 0.25):
                        kernel = np.real(gabor_kernel(freq, offset=offset, theta=theta, sigma_x=sigma, sigma_y=sigma))
                        kernels.append(kernel)
        pickle.dump(kernels, open(filter_path, "wb" ) )
    
    # plot kernels
    i=0
    plt.figure(figsize=(len(kernels), len(kernels)))
    for kernel in kernels:
        plt.subplot(sqrt(len(kernels)) ,sqrt(len(kernels)),i+1)
        plt.xticks([])
        plt.yticks([])
        plt.grid(False)
        plt.imshow(kernel, cmap=plt.cm.gray)
        i+=1
    plt.show()
    
    return kernels


# 将滤波器应用到图像
from scipy import ndimage as ndi
from skimage import data
from skimage.util import img_as_float
from skimage.filters import gabor_kernel
from IPython.display import display, clear_output
def apply_filters(data, data_path, filters, color=True):
    
    if path.exists(data_path):
        return pickle.load( open( data_path, "rb" ) )
    
    filteredData = {}
    for dataset in data:
        filteredData[dataset] = {}
        for className in data[dataset]:
            dis = display('aplying filters to: ' + dataset + ', ' + className, display_id=True)
            filteredData[dataset][className] = {}
            for idx, leaf in enumerate(data[dataset][className]):
                dis.update('aplying filters to: ' + dataset + ', ' + className 
                           + f' {100. * float(idx) / float(len(data[dataset][className])):.0f}' + "%")
                filteredData[dataset][className][leaf] = {}
                for ID in data[dataset][className][leaf]:
                    img = data[dataset][className][leaf][ID]
                    temp = [None] * (3 * len(filters))
                    for f, ffilter in enumerate(filters):
                        filtered = np.empty(1)
                        if color:
                            for i in range(3):
                                filtered = np.append(filtered, ndi.convolve(img[:,:,i], ffilter, mode='wrap'))
                        else:
                            filtered = ndi.convolve(img, ffilter, mode='wrap')
                        temp[3 * f] = filtered.sum()
                        temp[3 * f + 1] = filtered.mean()
                        temp[3 * f + 2] = filtered.var()
                    filteredData[dataset][className][leaf][ID] = temp
            dis.update('filters applied to: ' + dataset + ', ' + className)
                    
    pickle.dump(filteredData, open(data_path, "wb" ) )
    return filteredData

In [6]:
def prepare_data_for_machine_learning(data, data_path):
    
    if path.exists(data_path):
        return pickle.load( open( data_path, "rb" ) )
    
    # get dimensions
    nTrain = 0
    nTest = 0
    nFeatures = 0
    for dataset in data:
        for className in data[dataset]:
            for leaf in data[dataset][className]:
                if nFeatures == 0:
                    nFeatures = len(next(iter(data[dataset][className][leaf].values())))
                if dataset == 'train':
                    nTrain += len(data[dataset][className][leaf])
                else:
                    nTest += len(data[dataset][className][leaf])

    # create ml data
    mlData = {}
    mlData['X_train'] = np.zeros((nTrain, nFeatures), dtype=float)
    mlData['Y_train'] = [None] * nTrain
    mlData['L_train'] = [None] * nTrain
    mlData['ID_train'] = [None] * nTrain
    mlData['X_test'] = np.zeros((nTest, nFeatures), dtype=float)
    mlData['Y_test'] = [None] * nTest
    mlData['L_test'] = [None] * nTest
    mlData['ID_test'] = [None] * nTrain
    train_idx = 0
    test_idx = 0
    for dataset in data:
        for className in data[dataset]:
            for leaf in data[dataset][className]:
                for ID in data[dataset][className][leaf]:
                    x = data[dataset][className][leaf][ID]
                    if dataset == 'train':
                        for idx, x_i in enumerate(x):
                            mlData['X_train'][train_idx][idx] = x_i
                        mlData['Y_train'][train_idx] = className
                        mlData['L_train'][train_idx] = leaf
                        mlData['ID_train'][train_idx] = ID
                        train_idx += 1
                    else:
                        for idx, x_i in enumerate(x):
                            mlData['X_test'][test_idx][idx] = x_i
                        mlData['Y_test'][test_idx] = className
                        mlData['L_test'][test_idx] = leaf
                        mlData['ID_test'][test_idx] = ID
                        test_idx += 1
                    
    mlData['X_train'] = standardize(mlData['X_train'])
    mlData['X_test'] = standardize(mlData['X_test'])
    
    pickle.dump(mlData, open(data_path, "wb" ), protocol=4 )
    return mlData

In [7]:
# 跑各种机器学习方法并获得准确度结果
import warnings
def mla_class_results(mlas, X, Y, L, results_path, disp=True):
    
    if path.exists(results_path):
        results = pickle.load( open( results_path, "rb" ) )
        
    else:
        results = {}

        warnings.filterwarnings('ignore') # 忽略模型未收敛时的警告

        for mla in mlas:
            results[mla] = cross_validation_class(mla, X, Y, L, disp)

        pickle.dump(results, open(results_path, "wb" ) )
       
    return results

In [11]:
# X: 训练集 Y: 类标签 nFolds: 交叉验证的fold个数
from sklearn.tree import DecisionTreeClassifier 
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import cross_val_score
from random import randint
from sklearn.metrics import accuracy_score
def cross_validation(mla, X, Y, L):
    leafs = list(set(L))
    Y = np.array(Y)
    nFolds = 3
    nPer = int(len(leafs) / nFolds)
    pred_Y = []
    test_Y = []
    for f in range(nFolds):
        fLeafs = []
        if f < nFolds - 1:
            for r in range(nPer):
                rIdx = randint(0, len(leafs)-1)
                rLeaf = leafs[rIdx]
                fLeafs.append(rLeaf)
                leafs.pop(rIdx)
        else:
            fLeafs = leafs
        fRows = []
        for fLeaf in fLeafs:
            for idx, l in enumerate(L):
                if l == fLeaf:
                    fRows.append(idx)
        
        mla.fit(np.delete(X, fRows, 0), np.delete(Y, fRows, 0))
        pred_Y = np.concatenate((pred_Y, mla.predict(np.take(X, fRows, 0))))
        test_Y = np.concatenate((test_Y, np.take(Y, fRows, 0)))
    return accuracy_score(test_Y, pred_Y)

def cross_validation_class(name, X, Y, L, disp=True):
    
    mla = DecisionTreeClassifier()
    
    # 持续刷新结果
    means = [None] * 100
    
    # get results
    if disp:
        dis = display('Training ML: ' + name, display_id=True)
    for i in range(100):
        if disp:
            dis.update('Training ML: ' + name + f' {i:.0f}%')
        if 'DTr' in name:
            mla = DecisionTreeClassifier(random_state = i)
        elif 'LRe' in name:
            mla = LogisticRegression()
        elif 'NBa' in name:
            mla = GaussianNB()
        elif 'RFo' in name:
            mla = RandomForestClassifier(random_state = i, n_estimators=100)
        elif 'ETr' in name:
            mla = ExtraTreesClassifier(random_state = i, n_estimators=100)
        elif 'SVM' in name:
            mla = SVC(random_state = i, gamma='auto', probability=True)
        elif 'KNN' in name:
            mla = KNeighborsClassifier(n_neighbors=2)
        elif 'MLP' in name:
            mla = MLPClassifier(random_state = i, hidden_layer_sizes=[32], max_iter=10000)
            
        means[i] = cross_validation(mla, X, Y, L)
        
    if disp:
        dis.update('Trained ML: ' + name + ' with ' + "{:.2f}".format(sum(means) * 100 / len(means)) + '% average accuracy')
        
    return means