# Start From Here

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [None]:
import numpy as np
import math

In [None]:
import pandas as pd
tr_wkt = pd.read_csv('/content/drive/My Drive/GP/dstl/train_wkt_v4.csv')  # Well-known text Multipolygon
grid_sizes = pd.read_csv('/content/drive/My Drive/GP/dstl/grid_sizes.csv', names=['ImageId', 'Xmax', 'Ymin'], skiprows=1)

def convert_coordinates(coords, img_size, xy_coords):
    '''
    Convert all image coordinates given range
    '''
    # https://www.kaggle.com/visoft/dstl-satellite-imagery-feature-detection/export-pixel-wise-mask
    Xmax, Ymin = xy_coords              # Xmax - maximum X coordinate for the image
                                        # Ymin - minimum Y coordinate for the image
    Height, Weight = img_size            # image sizes
    W = 1.0 * Weight * Weight / (Weight + 1)
    H = 1.0 * Height * Height / (Height + 1)
    xf = W / Xmax
    yf = H / Ymin
    coords[:, 1] *= yf
    coords[:, 0] *= xf
    coords_int = np.round(coords).astype(np.int32)
    return coords_int
def get_xmax_ymin(GS, imageId):
    '''
    Find maximum coordinates for x and minimun coordinates for y
    ''' 
    xmax, ymin = GS[GS.ImageId == imageId].iloc[0, 1:].astype(float)
    return (xmax, ymin)
def get_polygon_list(wkt_list, imageId, cType):
    '''
    Find polygon list for all images with there class label
    '''
    # https://www.kaggle.com/visoft/dstl-satellite-imagery-feature-detection/export-pixel-wise-mask
    df_image = wkt_list[wkt_list.ImageId == imageId]
    multipoly_def = df_image[df_image.ClassType == cType].MultipolygonWKT
    polygonList = None
    if len(multipoly_def) > 0:
        assert len(multipoly_def) == 1
        polygonList = loads(multipoly_def.values[0])
    return polygonList
def get_and_convert_contours(polygonList, img_size, xy_coods):
    # https://www.kaggle.com/visoft/dstl-satellite-imagery-feature-detection/export-pixel-wise-mask
    perim_list = []
    interior_list = []
    if polygonList is None:
        return None
    for k in range(len(polygonList)):
        poly = polygonList[k]
        perim = np.array(list(poly.exterior.coords))
        perim_c = convert_coordinates(perim, img_size, xy_coods)
        perim_list.append(perim_c)
        for pi in poly.interiors:
            interior = np.array(list(pi.coords))
            interior_c = convert_coordinates(interior, img_size, xy_coods)
            interior_list.append(interior_c)
    return perim_list, interior_list
def plot_mask_from_contours(img_size, contours):
    # https://www.kaggle.com/visoft/dstl-satellite-imagery-feature-detection/export-pixel-wise-mask
    img_mask = np.zeros(img_size, np.uint8)
    if contours is None:
        return img_mask
    perim_list, interior_list = contours
    cv2.fillPoly(img_mask, perim_list,1)    
    cv2.fillPoly(img_mask, interior_list,0)
    return img_mask
def generate_mask_for_image_and_class(img_size, imageId, class_type, GS=grid_sizes, wkt_list=tr_wkt):
    '''
    Fill polygons exterior and interior points and return mask of images
    '''
    xy_coods = get_xmax_ymin(GS, imageId)
    polygon_list = get_polygon_list(wkt_list, imageId, class_type)
    contours = get_and_convert_contours(polygon_list, img_size, xy_coods)
    mask = plot_mask_from_contours(img_size, contours)
    return mask

In [None]:
def computeNormal2D(mat): #input must be a 2D array
    amin = np.amin(mat)
    mat = (mat - amin) / (np.amax(mat) - amin)
    return mat

def computeNormal3D(mat): #input must be a 3D array
    for i in range(mat.shape[2]):
        mat[:,:,i] = computeNormal2D(mat[:,:,i])
    return mat

def computeSMoment(mat,mean=None): #input must be a 2D array
  if mean == None:
    mean = mat.mean()
  return math.sqrt(mean)/np.square(mat).mean()

def convertRGB2HSI(img):
    img = img/1023.0
    imgHSI = img
    for i in range(len(img)):
        for j in range(len(img[0])):
            cmax = max(img[i][j][0],img[i][j][1],img[i][j][2])
            cmin = min(img[i][j][0],img[i][j][1],img[i][j][2])
            delta = cmax-cmin

            if cmax==cmin:
                imgHSI[i][j][0] = 0
            elif cmax == img[i][j][0]:
                imgHSI[i][j][0] = (60 * ((img[i][j][1] - img[i][j][2]) / delta) + 360) % 360
            elif cmax == img[i][j][1]:
                imgHSI[i][j][0] = (60 * ((img[i][j][2] - img[i][j][0]) / delta) + 120) % 360
            elif cmax == img[i][j][2]:
                imgHSI[i][j][0] = (60 * ((img[i][j][0] - img[i][j][1]) / delta) + 240) % 360
      
            if cmax==0:
                imgHSI[i][j][1]=0
            else:
                imgHSI[i][j][1]=(delta/cmax)
      
            imgHSI[i][j][2]=cmax

    return imgHSI

def computeCCMFeatures(img,band,windowSize = 7):

    if windowSize % 2 == 0:
        return
    paramCount = 0
    if band == 'H':
        paramCount = 3
        img = img*255/360
    elif band == 'S':
        paramCount = 2
        img = img*255
    elif band == 'I':
        paramCount = 2
        img = img*255
    else:
        return
  
    img = np.around(img)  
    img = img.astype(np.uint8)
  
    newImg = np.zeros((478,478,paramCount))
    halfWindow=windowSize//2
    ccm = np.zeros((256,256), dtype=np.uint8) #assuming windowSize <= 15
    a = 0
    b = 0
    for i in range(halfWindow,img.shape[0]-halfWindow,windowSize):
        for j in range(halfWindow,img.shape[1]-halfWindow,windowSize):
            # i,j = pixel in whole-image loop
            ccm = np.zeros((256,256), dtype=np.uint8) #assuming windowSize <= 15
            for k in range(i-halfWindow,min(img.shape[0]-1,i+halfWindow+1)):
                for l in range(j-halfWindow,min(img.shape[1]-1,j+halfWindow+1)):
                    # k,l = pixel in sliding window loop
                    ccm[img[k,l],img[k+1,l+1]] += 1
            mean = ccm.mean()
            newImg[a][b][0] = mean
            ccmSquared = np.square(ccm)
        
            if band == 'H':
                #sosvh is SSD
                window= img[(i-halfWindow):min(img.shape[0]-1,i+halfWindow+1), (j-halfWindow):min(img.shape[1]-1,j+halfWindow+1)]
                newImg[a][b][1]=np.sum(np.square(window-mean)) #sosvh
                newImg[a][b][2]=np.sum(ccmSquared) #autoc
            else:
                newImg[a][b][1]=(math.sqrt(mean))/(ccmSquared.mean()) #smoment
                #if band == 'I':
                #  covariance = np.cov(ccm)[0][-1] ###### Check if it works for a matrix and replace with my function
                #  newImg[a][b][2]=covariance
            b = (b+1)%478
        a += 1
    return newImg


def computeHSIFeatures(img,windowSize = 7):
    paramCount = 7
    newImg = np.zeros((478,478,paramCount))
    halfWindow=windowSize//2
    a = 0
    b = 0
    for i in range(halfWindow,img.shape[0]-halfWindow,7):
        for j in range(halfWindow,img.shape[1]-halfWindow,7):
            # i,j = pixel in whole-image loop      
        
            minX=i-halfWindow
            maxX=i+halfWindow+1
            minY=j-halfWindow
            maxY=j+halfWindow+1

            newImg[a][b][4] = img[minX:maxX,minY:maxY,0].mean() #meanH
            newImg[a][b][6] = img[minX:maxX,minY:maxY,1].mean() #meanS
            newImg[a][b][5] = img[minX:maxX,minY:maxY,2].mean() #meanI
            newImg[a][b][0] = computeSMoment(img[minX:maxX,minY:maxY,2], newImg[a][b][5]) #smomentI
            newImg[a][b][1] = np.var(img[minX:maxX,minY:maxY,2]) #varianceI
            newImg[a][b][2] = math.sqrt(newImg[a][b][1]) #stdI
            newImg[a][b][3] = np.std(img[minX:maxX,minY:maxY,0]) #stdH
            b = (b+1)%478
        a += 1
        
    return newImg

def computeRGBNFeatures(img,windowSize = 7):
    paramCount = 2
    newImg = np.zeros((478,478,paramCount))
    halfWindow=windowSize//2
    a = 0
    b = 0
    for i in range(halfWindow,img.shape[0]-halfWindow,7):
        for j in range(halfWindow,img.shape[1]-halfWindow,7):
            # i,j = pixel in whole-image loop      
            
            minX=i-halfWindow
            maxX=i+halfWindow+1
            minY=j-halfWindow
            maxY=j+halfWindow+1
            
            newImg[a][b][1] = img[minX:maxX,minY:maxY].mean() #meanNIR
            newImg[a][b][0] = math.sqrt(np.var(img[minX:maxX,minY:maxY])) #stdNIR
            b = (b+1)%478
        a += 1
    
    return newImg

In [None]:
import tifffile as tiff
import cv2
from shapely.wkt import loads

N_Cls=10

def all_image_mod(image_id):
    imgRGBN = np.zeros((3346, 3346, 4), "float32")
    # for type M 
    imgRGBN[..., 3] = cv2.resize(np.transpose(tiff.imread("/content/drive/My Drive/GP/dstl/sixteen_band/{}_M.tif".format(image_id)), (1,2,0))[:,:,7], (3346, 3346))
    #img_NIR = cv2.resize(img_NIR, (3346, 3346)) #stretch to fit 7*7

    # for RGB
    imgRGBN[..., 0:3] = cv2.resize(np.moveaxis(tiff.imread("/content/drive/My Drive/GP/dstl/three_band/{}.tif".format(image_id)),0,-1), (3346, 3346)) #compress to fit 7*7
    #img_RGB = img_RGB[:3347,:3347,:] 
    
    #print(f'RGB shape: {img_RGB.shape}\nM shape: {img_M.shape}\nimg shape: {img.shape}')
    
    #imgRGBN = stretch_n(imgRGBN)
    imgHSI = convertRGB2HSI(imgRGBN[:,:,0:3])
    img = np.zeros((478,478,16)) 
    
    temp = computeCCMFeatures(imgHSI[:,:,0],'H')
    print("ccmH")
    img[:,:,4]=temp[:,:,0]
    img[:,:,1]=temp[:,:,1]
    img[:,:,2]=temp[:,:,2]

    temp = computeCCMFeatures(imgHSI[:,:,1],'S')
    print("ccmS")
    img[:,:,3]=temp[:,:,0]
    img[:,:,5]=temp[:,:,1]

    temp = computeCCMFeatures(imgHSI[:,:,2],'I')
    print("ccmI")
    img[:,:,0]=temp[:,:,0]
    img[:,:,6]=temp[:,:,1]

    temp = computeHSIFeatures(imgHSI)
    print("HSI")
    img[:,:,7:9]=temp[:,:,0:2]
    img[:,:,10:15]=temp[:,:,2:7]

    temp = computeRGBNFeatures(imgRGBN[...,3])
    print("RGBN")
    img[:,:,9]=temp[:,:,0]
    img[:,:,15]=temp[:,:,1]

    return img

def stick_all_train_mod():
    s = 478 #478 is my image size
    x = np.zeros((5 * s, 5 * s, 16))
    #x = np.load((chosen_path+'x_trn_%d.npy') % N_Cls)
    y = np.zeros((5 * s, 5 * s, N_Cls))

    ids = sorted(tr_wkt.ImageId.unique())
    print(len(ids))
    for i in range(5):
        for j in range(5):
            if np.amax(x[s * i:s * i + s, s * j:s * j + s, :]) == 0:
                id = ids[5 * i + j]
                img = all_image_mod(id)
                #print(img.shape, id, np.amax(img), np.amin(img))
                x[s * i:s * i + s, s * j:s * j + s, :] = img[:s, :s, :]
                np.save((chosen_path+'x_trn_%d') % N_Cls, x)
                for z in range(N_Cls):
                    y[s * i:s * i + s, s * j:s * j + s, z] = generate_mask_for_image_and_class((s, s), id, z + 1)
            print(5 * i + j)
                        

    #print(np.amax(y), np.amin(y))

    np.save((chosen_path+'x_trn_%d') % N_Cls, x)
    np.save((chosen_path+'y_trn_%d') % N_Cls, y)



In [None]:
'''
def computeLastCCMFeature(img,windowSize = 7):
    img = np.around(img)  
    img = img.astype(np.uint8)
  
    newImg = np.zeros((478,478))
    halfWindow=windowSize//2
    ccm = np.zeros((256,256), dtype=np.uint8) #assuming windowSize <= 15
    a = 0
    b = 0
    for i in range(halfWindow,img.shape[0]-halfWindow,windowSize):
        for j in range(halfWindow,img.shape[1]-halfWindow,windowSize):
            # i,j = pixel in whole-image loop
            ccm = np.zeros((256,256), dtype=np.uint8) #assuming windowSize <= 15
            for k in range(i-halfWindow,min(img.shape[0]-1,i+halfWindow+1)):
                for l in range(j-halfWindow,min(img.shape[1]-1,j+halfWindow+1)):
                    # k,l = pixel in sliding window loop
                    ccm[img[k,l],img[k+1,l+1]] += 1
            
            
            newImg[a][b]=np.sum(np.square(img[(i-halfWindow):min(img.shape[0]-1,i+halfWindow+1), (j-halfWindow):min(img.shape[1]-1,j+halfWindow+1)]-ccm.mean())) #sosvh
            
            b = (b+1)%478
        a += 1
    return newImg
def all_image_last(image_id):
    imgRGBN = np.zeros((3346, 3346, 3), "float32")
    # for type M 
    #imgRGBN[..., 3] = cv2.resize(np.transpose(tiff.imread("/content/drive/My Drive/GP/dstl/sixteen_band/{}_M.tif".format(image_id)), (1,2,0))[:,:,7], (3346, 3346))
    #img_NIR = cv2.resize(img_NIR, (3346, 3346)) #stretch to fit 7*7

    # for RGB
    imgRGBN[..., 0:3] = cv2.resize(np.moveaxis(tiff.imread("/content/drive/My Drive/GP/dstl/three_band/{}.tif".format(image_id)),0,-1), (3346, 3346)) #compress to fit 7*7
    #img_RGB = img_RGB[:3347,:3347,:] 
    
    #print(f'RGB shape: {img_RGB.shape}\nM shape: {img_M.shape}\nimg shape: {img.shape}')
    
    img = computeLastCCMFeature(convertRGB2HSI(imgRGBN[:,:,0:3])[:,:,0])

    return img

def stick_all_train_last():
    s = 478
    x = np.zeros((5 * s, 5 * s))

    ids = sorted(tr_wkt.ImageId.unique())
    for i in range(5):
        for j in range(5):
            #if np.amax(x[s * i:s * i + s, s * j:s * j + s, :]) == 0:
            id = ids[5 * i + j]
            img = all_image_last(id)
            print(img.shape, id, np.amax(img), np.amin(img))
            x[s * i:s * i + s, s * j:s * j + s] = img[:s, :s]
            np.save((chosen_path+'x_whole_10_478_last'), x)
            print(5 * i + j)
                            

    print(np.amax(x), np.amin(x))
'''

'\ndef computeLastCCMFeature(img,windowSize = 7):\n    img = np.around(img)  \n    img = img.astype(np.uint8)\n  \n    newImg = np.zeros((478,478))\n    halfWindow=windowSize//2\n    ccm = np.zeros((256,256), dtype=np.uint8) #assuming windowSize <= 15\n    a = 0\n    b = 0\n    for i in range(halfWindow,img.shape[0]-halfWindow,windowSize):\n        for j in range(halfWindow,img.shape[1]-halfWindow,windowSize):\n            # i,j = pixel in whole-image loop\n            ccm = np.zeros((256,256), dtype=np.uint8) #assuming windowSize <= 15\n            for k in range(i-halfWindow,min(img.shape[0]-1,i+halfWindow+1)):\n                for l in range(j-halfWindow,min(img.shape[1]-1,j+halfWindow+1)):\n                    # k,l = pixel in sliding window loop\n                    ccm[img[k,l],img[k+1,l+1]] += 1\n            \n            \n            newImg[a][b]=np.sum(np.square(img[(i-halfWindow):min(img.shape[0]-1,i+halfWindow+1), (j-halfWindow):min(img.shape[1]-1,j+halfWindow+1)]-ccm.mean(

In [None]:
chosen_path = "/content/drive/My Drive/GP/dstl/walid/"

In [None]:
#stick_all_train_last()

In [None]:
#stick_all_train_mod()

In [None]:
%tensorflow_version 1.x

TensorFlow 1.x selected.


In [None]:
import tensorflow as tf
from abc import ABCMeta, abstractmethod
from scipy.stats import truncnorm
#from sklearn.base import BaseEstimator, TransformerMixin, ClassifierMixin

def weight_variable(func, shape, stddev, dtype=tf.float32):
    initial = func(shape, stddev=stddev, dtype=dtype)
    return tf.Variable(initial)

def bias_variable(value, shape, dtype=tf.float32):
    initial = tf.constant(value, shape=shape, dtype=dtype)
    return tf.Variable(initial)

def batch_generator(batch_size, data, labels=None):
    """
    Generates batches of samples
    :param data: array-like, shape = (n_samples, n_features)
    :param labels: array-like, shape = (n_samples, )
    :return:
    """
    n_batches = int(np.ceil(len(data) / float(batch_size)))
    idx = np.random.permutation(len(data))
    data_shuffled = data[idx]
    if labels is not None:
        labels_shuffled = labels[idx]
    for i in range(n_batches):
        start = i * batch_size
        end = start + batch_size
        if labels is not None:
            yield data_shuffled[start:end, :], labels_shuffled[start:end]
        else:
            yield data_shuffled[start:end, :]


def to_categorical(labels, num_classes):
    """
    Converts labels as single integer to row vectors. For instance, given a three class problem, labels would be
    mapped as label_1: [1 0 0], label_2: [0 1 0], label_3: [0, 0, 1] where labels can be either int or string.
    :param labels: array-like, shape = (n_samples, )
    :return:
    """
    new_labels = np.zeros([len(labels), num_classes])
    label_to_idx_map, idx_to_label_map = dict(), dict()
    idx = 0
    for i, label in enumerate(labels):
        if label not in label_to_idx_map:
            label_to_idx_map[label] = idx
            idx_to_label_map[idx] = label
            idx += 1
        new_labels[i][label_to_idx_map[label]] = 1
    return new_labels, label_to_idx_map, idx_to_label_map


In [None]:
class BinaryRBM():
    """
    This class implements a Binary Restricted Boltzmann machine.
    """

    def __init__(self,
                 n_hidden_units=100,
                 activation_function='sigmoid',
                 optimization_algorithm='sgd',
                 learning_rate=1e-3,
                 n_epochs=10,
                 contrastive_divergence_iter=1,
                 batch_size=32,
                 verbose=True):
        self.n_hidden_units = n_hidden_units
        self.activation_function = activation_function
        self.optimization_algorithm = optimization_algorithm
        self.learning_rate = learning_rate
        self.n_epochs = n_epochs
        self.contrastive_divergence_iter = contrastive_divergence_iter
        self.batch_size = batch_size
        self.verbose = verbose

    def fit(self, X):
        """
        Fit a model given data.
        :param X: array-like, shape = (n_samples, n_features)
        :return:
        """
        self.n_visible_units = X.shape[1]

        # Initialize RBM parameters
        self._build_model()

        sess.run(tf.variables_initializer([self.W, self.c, self.b]))

        if self.optimization_algorithm == 'sgd':
            self._stochastic_gradient_descent(X)
        else:
            raise ValueError("Invalid optimization algorithm.")
        return
    
    @classmethod
    def _get_weight_variables_names(cls):
        return ['W', 'c', 'b']

    @classmethod
    def _get_param_names(cls):
        return ['n_hidden_units',
                'n_visible_units',
                'activation_function',
                'optimization_algorithm',
                'learning_rate',
                'n_epochs',
                'contrastive_divergence_iter',
                'batch_size',
                'verbose',
                '_activation_function_class']

    def _initialize_weights(self, weights):
        if weights:
            for attr_name, value in weights.items():
                self.__setattr__(attr_name, tf.Variable(value))
        else:
            if self.activation_function == 'sigmoid':
                stddev = 1.0 / np.sqrt(self.n_visible_units)
                self.W = weight_variable(tf.random_normal, [self.n_hidden_units, self.n_visible_units], stddev)
                self.c = weight_variable(tf.random_normal, [self.n_hidden_units], stddev)
                self.b = weight_variable(tf.random_normal, [self.n_visible_units], stddev)
                self._activation_function_class = tf.nn.sigmoid
            elif self.activation_function == 'relu':
                stddev = 0.1 / np.sqrt(self.n_visible_units)
                self.W = weight_variable(tf.truncated_normal, [self.n_hidden_units, self.n_visible_units], stddev)
                self.c = bias_variable(stddev, [self.n_hidden_units])
                self.b = bias_variable(stddev, [self.n_visible_units])
                self._activation_function_class = tf.nn.relu
            else:
                raise ValueError("Invalid activation function.")

    def _build_model(self, weights=None):
        """
        Builds TensorFlow model.
        :return:
        """
        # initialize weights and biases
        self._initialize_weights(weights)

        # TensorFlow operations
        self.visible_units_placeholder = tf.placeholder(tf.float32, shape=[None, self.n_visible_units])
        self.compute_hidden_units_op = self._activation_function_class(
            tf.transpose(tf.matmul(self.W, tf.transpose(self.visible_units_placeholder))) + self.c)
        self.hidden_units_placeholder = tf.placeholder(tf.float32, shape=[None, self.n_hidden_units])
        self.compute_visible_units_op = self._activation_function_class(
            tf.matmul(self.hidden_units_placeholder, self.W) + self.b)
        self.random_uniform_values = tf.Variable(tf.random_uniform([self.batch_size, self.n_hidden_units]))
        sample_hidden_units_op = tf.to_float(self.random_uniform_values < self.compute_hidden_units_op)
        self.random_variables = [self.random_uniform_values]

        # Positive gradient
        # Outer product. N is the batch size length.
        # From http://stackoverflow.com/questions/35213787/tensorflow-batch-outer-product
        positive_gradient_op = tf.matmul(tf.expand_dims(sample_hidden_units_op, 2),  # [N, U, 1]
                                         tf.expand_dims(self.visible_units_placeholder, 1))  # [N, 1, V]

        # Negative gradient
        # Gibbs sampling
        sample_hidden_units_gibbs_step_op = sample_hidden_units_op
        for t in range(self.contrastive_divergence_iter):
            compute_visible_units_op = self._activation_function_class(
                tf.matmul(sample_hidden_units_gibbs_step_op, self.W) + self.b)
            compute_hidden_units_gibbs_step_op = self._activation_function_class(
                tf.transpose(tf.matmul(self.W, tf.transpose(compute_visible_units_op))) + self.c)
            random_uniform_values = tf.Variable(tf.random_uniform([self.batch_size, self.n_hidden_units]))
            sample_hidden_units_gibbs_step_op = tf.to_float(random_uniform_values < compute_hidden_units_gibbs_step_op)
            self.random_variables.append(random_uniform_values)

        negative_gradient_op = tf.matmul(tf.expand_dims(sample_hidden_units_gibbs_step_op, 2),  # [N, U, 1]
                                         tf.expand_dims(compute_visible_units_op, 1))  # [N, 1, V]

        compute_delta_W = tf.reduce_mean(positive_gradient_op - negative_gradient_op, 0)
        compute_delta_b = tf.reduce_mean(self.visible_units_placeholder - compute_visible_units_op, 0)
        compute_delta_c = tf.reduce_mean(sample_hidden_units_op - sample_hidden_units_gibbs_step_op, 0)

        self.update_W = tf.assign_add(self.W, self.learning_rate * compute_delta_W)
        self.update_b = tf.assign_add(self.b, self.learning_rate * compute_delta_b)
        self.update_c = tf.assign_add(self.c, self.learning_rate * compute_delta_c)

    def to_dict(self):
        dct_to_save = {name: self.__getattribute__(name) for name in self._get_param_names()}
        dct_to_save.update(
            {name: self.__getattribute__(name).eval(sess) for name in self._get_weight_variables_names()})
        return dct_to_save

    @classmethod
    def from_dict(cls, dct_to_load):
        weights = {var_name: dct_to_load.pop(var_name) for var_name in cls._get_weight_variables_names()}

        _activation_function_class = dct_to_load.pop('_activation_function_class')
        n_visible_units = dct_to_load.pop('n_visible_units')

        instance = cls(**dct_to_load)
        setattr(instance, '_activation_function_class', _activation_function_class)
        setattr(instance, 'n_visible_units', n_visible_units)

        # Initialize RBM parameters
        instance._build_model(weights)
        sess.run(tf.variables_initializer([getattr(instance, name) for name in cls._get_weight_variables_names()]))

        return instance
    
    def transform(self, X):
        """
        Transforms data using the fitted model.
        :param X: array-like, shape = (n_samples, n_features)
        :return:
        """
        if len(X.shape) == 1:  # It is a single sample
            return self._compute_hidden_units(X)
        transformed_data = self._compute_hidden_units_matrix(X)
        return transformed_data

    def _reconstruct(self, transformed_data):
        """
        Reconstruct visible units given the hidden layer output.
        :param transformed_data: array-like, shape = (n_samples, n_features)
        :return:
        """
        return self._compute_visible_units_matrix(transformed_data)

    def _stochastic_gradient_descent(self, _data):
        """
        Performs stochastic gradient descend optimization algorithm.
        :param _data: array-like, shape = (n_samples, n_features)
        :return:
        """
        for iteration in range(1, self.n_epochs + 1):
            idx = np.random.permutation(len(_data))
            data = _data[idx]
            for batch in batch_generator(self.batch_size, data):
                if len(batch) < self.batch_size:
                    # Pad with zeros
                    pad = np.zeros((self.batch_size - batch.shape[0], batch.shape[1]), dtype=batch.dtype)
                    batch = np.vstack((batch, pad))
                sess.run(tf.variables_initializer(self.random_variables))  # Need to re-sample from uniform distribution
                sess.run([self.update_W, self.update_b, self.update_c],
                         feed_dict={self.visible_units_placeholder: batch})
            if self.verbose:
                error = self._compute_reconstruction_error(data)
                print(">> Epoch %d finished \tRBM Reconstruction error %f" % (iteration, error))

    def _contrastive_divergence(self, vector_visible_units):
        """
        Computes gradients using Contrastive Divergence method.
        :param vector_visible_units: array-like, shape = (n_features, )
        :return:
        """
        v_0 = vector_visible_units
        v_t = np.array(v_0)

        # Sampling
        for t in range(self.contrastive_divergence_iter):
            h_t = self._sample_hidden_units(v_t)
            v_t = self._compute_visible_units(h_t)

        # Computing deltas
        v_k = v_t
        h_0 = self._compute_hidden_units(v_0)
        h_k = self._compute_hidden_units(v_k)
        delta_W = np.outer(h_0, v_0) - np.outer(h_k, v_k)
        delta_b = v_0 - v_k
        delta_c = h_0 - h_k

        return delta_W, delta_b, delta_c

    def _sample_hidden_units(self, vector_visible_units):
        """
        Computes hidden unit activations by sampling from a binomial distribution.
        :param vector_visible_units: array-like, shape = (n_features, )
        :return:
        """
        hidden_units = self._compute_hidden_units(vector_visible_units)
        return (np.random.random_sample(len(hidden_units)) < hidden_units).astype(np.int64)

    def _sample_visible_units(self, vector_hidden_units):
        """
        Computes visible unit activations by sampling from a binomial distribution.
        :param vector_hidden_units: array-like, shape = (n_features, )
        :return:
        """
        visible_units = self._compute_visible_units(vector_hidden_units)
        return (np.random.random_sample(len(visible_units)) < visible_units).astype(np.int64)

    def _compute_hidden_units(self, vector_visible_units):
        """
        Computes hidden unit outputs.
        :param vector_visible_units: array-like, shape = (n_features, )
        :return:
        """
        v = np.expand_dims(vector_visible_units, 0)
        h = np.squeeze(self._compute_hidden_units_matrix(v))
        return np.array([h]) if not h.shape else h

    def _compute_hidden_units_matrix(self, matrix_visible_units):
        """
        Computes hidden unit outputs.
        :param matrix_visible_units: array-like, shape = (n_samples, n_features)
        :return:
        """
        return sess.run(self.compute_hidden_units_op,
                        feed_dict={self.visible_units_placeholder: matrix_visible_units})

    def _compute_visible_units(self, vector_hidden_units):
        """
        Computes visible (or input) unit outputs.
        :param vector_hidden_units: array-like, shape = (n_features, )
        :return:
        """
        h = np.expand_dims(vector_hidden_units, 0)
        v = np.squeeze(self._compute_visible_units_matrix(h))
        return np.array([v]) if not v.shape else v

    def _compute_visible_units_matrix(self, matrix_hidden_units):
        """
        Computes visible (or input) unit outputs.
        :param matrix_hidden_units: array-like, shape = (n_samples, n_features)
        :return:
        """
        return sess.run(self.compute_visible_units_op,
                        feed_dict={self.hidden_units_placeholder: matrix_hidden_units})

    def _compute_free_energy(self, vector_visible_units):
        """
        Computes the RBM free energy.
        :param vector_visible_units: array-like, shape = (n_features, )
        :return:
        """
        v = vector_visible_units
        return - np.dot(self.b, v) - np.sum(np.log(1 + np.exp(np.dot(self.W, v) + self.c)))

    def _compute_reconstruction_error(self, data):
        """
        Computes the reconstruction error of the data.
        :param data: array-like, shape = (n_samples, n_features)
        :return:
        """
        data_transformed = self.transform(data)
        data_reconstructed = self._reconstruct(data_transformed)
        return np.mean(np.sum((data_reconstructed - data) ** 2, 1))



In [None]:
class SupervisedDBNClassification():
    """
    Abstract class for supervised Deep Belief Network.
    """
    __metaclass__ = ABCMeta
    
    def __init__(self,
                 hidden_layers_structure=[100, 100],
                 activation_function='sigmoid',
                 optimization_algorithm='sgd',
                 learning_rate=1e-3,
                 learning_rate_rbm=1e-3,
                 n_iter_backprop=100,
                 l2_regularization=1.0,
                 n_epochs_rbm=10,
                 contrastive_divergence_iter=1,
                 rbm_batch_size=512,
                 ann_batch_size=32,
                 dropout_p=0,  # float between 0 and 1. Fraction of the input units to drop
                 verbose=True):
        self.hidden_layers_structure = hidden_layers_structure
        self.activation_function = activation_function
        self.optimization_algorithm = optimization_algorithm
        self.learning_rate_rbm = learning_rate_rbm
        self.n_epochs_rbm = n_epochs_rbm
        self.contrastive_divergence_iter = contrastive_divergence_iter
        self.rbm_batch_size = rbm_batch_size
        self.rbm_layers = None
        self.rbm_class = BinaryRBM
        self.n_iter_backprop = n_iter_backprop
        self.l2_regularization = l2_regularization
        self.learning_rate = learning_rate
        self.ann_batch_size = ann_batch_size
        self.dropout_p = dropout_p
        self.p = 1 - self.dropout_p
        self.verbose = verbose

    @classmethod
    def _get_param_names(cls):
        return ['hidden_layers_structure',
                'activation_function',
                'optimization_algorithm',
                'learning_rate_rbm',
                'n_epochs_rbm',
                'contrastive_divergence_iter',
                'rbm_batch_size',
                'n_iter_backprop',
                'l2_regularization',
                'learning_rate',
                'ann_batch_size',
                'dropout_p',
                'verbose',
                'label_to_idx_map', 
                'idx_to_label_map']

    @classmethod
    def _get_weight_variables_names(cls):
        return ['W', 'b']

    def _initialize_weights(self, weights):
        if weights:
            for attr_name, value in weights.items():
                self.__setattr__(attr_name, tf.Variable(value))
        else:
            if self.activation_function == 'sigmoid':
                stddev = 1.0 / np.sqrt(self.input_units)
                self.W = weight_variable(tf.random_normal, [self.input_units, self.num_classes], stddev)
                self.b = weight_variable(tf.random_normal, [self.num_classes], stddev)
                self._activation_function_class = tf.nn.sigmoid
            elif self.activation_function == 'relu':
                stddev = 0.1 / np.sqrt(self.input_units)
                self.W = weight_variable(tf.truncated_normal, [self.input_units, self.num_classes], stddev)
                self.b = bias_variable(stddev, [self.num_classes])
                self._activation_function_class = tf.nn.relu
            else:
                raise ValueError("Invalid activation function.")

    def save(self, save_path):
        import pickle

        with open(save_path, 'wb') as fp:
            pickle.dump(self.to_dict(), fp)

    @classmethod
    def load(cls, load_path):
        import pickle

        with open(load_path, 'rb') as fp:
            dct_to_load = pickle.load(fp)
            return cls.from_dict(dct_to_load)

    def to_dict(self):
        dct_to_save = {name: self.__getattribute__(name) for name in self._get_param_names()}
        dct_to_save.update(
            {name: self.__getattribute__(name).eval(sess) for name in self._get_weight_variables_names()})
        dct_to_save['rbm_layers'] = [rbm.to_dict() for rbm in self.rbm_layers]
        dct_to_save['num_classes'] = self.num_classes
        return dct_to_save

    @classmethod
    def from_dict(cls, dct_to_load):
        weights = {var_name: dct_to_load.pop(var_name) for var_name in cls._get_weight_variables_names()}
        num_classes = dct_to_load.pop('num_classes')
        label_to_idx_map = dct_to_load.pop('label_to_idx_map')
        idx_to_label_map = dct_to_load.pop('idx_to_label_map')
        rbm_layers = dct_to_load.pop('rbm_layers')
        
        instance = cls(**dct_to_load)
        
        setattr(instance, 'rbm_layers', [instance.rbm_class.from_dict(rbm) for rbm in rbm_layers])
        setattr(instance, 'num_classes', num_classes)
        setattr(instance, 'label_to_idx_map', label_to_idx_map)
        setattr(instance, 'idx_to_label_map', idx_to_label_map)
        # Initialize RBM parameters
        instance._build_model(weights)
        sess.run(tf.variables_initializer([getattr(instance, name) for name in cls._get_weight_variables_names()]))
        return instance

    def _build_model(self, weights=None):
        self.visible_units_placeholder = self.rbm_layers[0].visible_units_placeholder
        keep_prob = tf.placeholder(tf.float32)
        visible_units_placeholder_drop = tf.nn.dropout(self.visible_units_placeholder, keep_prob)
        self.keep_prob_placeholders = [keep_prob]

        # Define tensorflow operation for a forward pass
        rbm_activation = visible_units_placeholder_drop
        for rbm in self.rbm_layers:
            rbm_activation = rbm._activation_function_class(
                tf.transpose(tf.matmul(rbm.W, tf.transpose(rbm_activation))) + rbm.c)
            keep_prob = tf.placeholder(tf.float32)
            self.keep_prob_placeholders.append(keep_prob)
            rbm_activation = tf.nn.dropout(rbm_activation, keep_prob)

        self.transform_op = rbm_activation
        self.input_units = self.rbm_layers[-1].n_hidden_units

        # weights and biases
        self._initialize_weights(weights)

        if self.optimization_algorithm == 'sgd':
            self.optimizer = tf.train.GradientDescentOptimizer(self.learning_rate)
        else:
            raise ValueError("Invalid optimization algorithm.")

        # operations
        self.y = tf.matmul(self.transform_op, self.W) + self.b
        self.y_ = tf.placeholder(tf.float32, shape=[None, self.num_classes])
        self.output = tf.nn.softmax(self.y)
        self.cost_function = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=self.y, labels=tf.stop_gradient(self.y_)))
        self.train_step = self.optimizer.minimize(self.cost_function)

    def _fine_tuning(self, data, _labels):
        self.num_classes = self._determine_num_output_neurons(_labels)
        if self.num_classes == 1:
            _labels = np.expand_dims(_labels, -1)

        self._build_model()
        sess.run(tf.variables_initializer([self.W, self.b]))

        labels = self._transform_labels_to_network_format(_labels)

        if self.verbose:
            print("[START] Fine tuning step:")
        self._stochastic_gradient_descent(data, labels)
        if self.verbose:
            print("[END] Fine tuning step")

    def _stochastic_gradient_descent(self, data, labels):
        for iteration in range(self.n_iter_backprop):
            for batch_data, batch_labels in batch_generator(self.ann_batch_size, data, labels):
                feed_dict = {self.visible_units_placeholder: batch_data,
                             self.y_: batch_labels}
                feed_dict.update({placeholder: self.p for placeholder in self.keep_prob_placeholders})
                sess.run(self.train_step, feed_dict=feed_dict)

            if self.verbose:
                feed_dict = {self.visible_units_placeholder: data, self.y_: labels}
                feed_dict.update({placeholder: 1.0 for placeholder in self.keep_prob_placeholders})
                error = sess.run(self.cost_function, feed_dict=feed_dict)
                print(">> Epoch %d finished \tANN training loss %f" % (iteration, error))
    
    ###Not used
    def transform_dbn(self, X):
        """
        Transforms data using the fitted model.
        :param X: array-like, shape = (n_samples, n_features)
        :return:
        """
        input_data = X
        for rbm in self.rbm_layers:
            input_data = rbm.transform(input_data)
        return input_data

    ###Not used
    def transform_ann(self, X):
        feed_dict = {self.visible_units_placeholder: X}
        feed_dict.update({placeholder: 1.0 for placeholder in self.keep_prob_placeholders})
        return sess.run(self.transform_op,
                        feed_dict=feed_dict)

    def _compute_output_units_matrix(self, matrix_visible_units):
        feed_dict = {self.visible_units_placeholder: matrix_visible_units}
        feed_dict.update({placeholder: 1.0 for placeholder in self.keep_prob_placeholders})
        return sess.run(self.output, feed_dict=feed_dict)

    def fit(self, X, y=None):
        """
        Fits a model given data.
        :param X: array-like, shape = (n_samples, n_features)
        :param y : array-like, shape = (n_samples, )
        :param pre_train: bool
        :return:
        """
        self.pre_train(X)
        self._fine_tuning(X, y)
        return self

    def pre_train(self, X):
        """
        Apply unsupervised network pre-training.
        :param X: array-like, shape = (n_samples, n_features)
        :return:
        """
        self.rbm_layers = list()
        for n_hidden_units in self.hidden_layers_structure:
            rbm = self.rbm_class(n_hidden_units=n_hidden_units,
                                 activation_function=self.activation_function,
                                 optimization_algorithm=self.optimization_algorithm,
                                 learning_rate=self.learning_rate_rbm,
                                 n_epochs=self.n_epochs_rbm,
                                 contrastive_divergence_iter=self.contrastive_divergence_iter,
                                 batch_size=self.rbm_batch_size,
                                 verbose=self.verbose)
            self.rbm_layers.append(rbm)

        # Fit RBM
        if self.verbose:
            print("[START] Pre-training step:")
        input_data = X
        for rbm in self.rbm_layers:
            rbm.fit(input_data)
            input_data = rbm.transform(input_data)
        if self.verbose:
            print("[END] Pre-training step")
        return self


    def _transform_labels_to_network_format(self, labels):
        new_labels, label_to_idx_map, idx_to_label_map = to_categorical(labels, self.num_classes)
        self.label_to_idx_map = label_to_idx_map
        self.idx_to_label_map = idx_to_label_map
        return new_labels

    def _transform_network_format_to_labels(self, indexes):
        """
        Converts network output to original labels.
        :param indexes: array-like, shape = (n_samples, )
        :return:
        """
        return list(map(lambda idx: self.idx_to_label_map[idx], indexes))

    def predict(self, X):
        probs = self.predict_proba(X)
        indexes = np.argmax(probs, axis=1)
        return self._transform_network_format_to_labels(indexes)

    def predict_proba(self, X):
        """
        Predicts probability distribution of classes for each sample in the given data.
        :param X: array-like, shape = (n_samples, n_features)
        :return:
        """
        return self._compute_output_units_matrix(X)

    def predict_proba_dict(self, X):
        """
        Predicts probability distribution of classes for each sample in the given data.
        Returns a list of dictionaries, one per sample. Each dict contains {label_1: prob_1, ..., label_j: prob_j}
        :param X: array-like, shape = (n_samples, n_features)
        :return:
        """
        if len(X.shape) == 1:  # It is a single sample
            X = np.expand_dims(X, 0)

        predicted_probs = self.predict_proba(X)

        result = []
        num_of_data, num_of_labels = predicted_probs.shape
        for i in range(num_of_data):
            # key : label
            # value : predicted probability
            dict_prob = {}
            for j in range(num_of_labels):
                dict_prob[self.idx_to_label_map[j]] = predicted_probs[i][j]
            result.append(dict_prob)

        return result

    def _determine_num_output_neurons(self, labels):
        return len(np.unique(labels))


In [None]:
#examples.py
#np.random.seed(1337)  # for reproducibility
from time import time
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.metrics.classification import accuracy_score
'''
X = np.load(('/content/drive/My Drive/GP/dstl/walid/x_whole_10_478_16.npy'))
Y = np.load(('/content/drive/My Drive/GP/dstl/walid/y_trn_10.npy'))
X = computeNormal3D(X)
'''



"\nX = np.load(('/content/drive/My Drive/GP/dstl/walid/x_whole_10_478_16.npy'))\nY = np.load(('/content/drive/My Drive/GP/dstl/walid/y_trn_10.npy'))\nX = computeNormal3D(X)\n"

In [None]:
'''Y2 = np.zeros((Y.shape[0],Y.shape[1]))
for i in range(Y.shape[0]):
    for j in range(Y.shape[1]):
        curr = 10
        for k in range(Y.shape[2]):
            if Y[i,j,k] != 0:
                curr = k
        Y2[i,j]=curr
Y2 = Y2.flatten()'''

'Y2 = np.zeros((Y.shape[0],Y.shape[1]))\nfor i in range(Y.shape[0]):\n    for j in range(Y.shape[1]):\n        curr = 10\n        for k in range(Y.shape[2]):\n            if Y[i,j,k] != 0:\n                curr = k\n        Y2[i,j]=curr\nY2 = Y2.flatten()'

In [None]:
'''
X2 = np.zeros((X.shape[0]*X.shape[1],X.shape[2]))
for i in range(X.shape[0]):
    for j in range(X.shape[1]):
        X2[i*X.shape[0]+j]=X[i,j]
'''

'\nX2 = np.zeros((X.shape[0]*X.shape[1],X.shape[2]))\nfor i in range(X.shape[0]):\n    for j in range(X.shape[1]):\n        X2[i*X.shape[0]+j]=X[i,j]\n'

In [None]:
#print(X2.shape)
#np.save('/content/drive/My Drive/GP/dstl/walid/x_whole_10_478_16_flat',X2)
#np.save('/content/drive/My Drive/GP/dstl/walid/CDBN_512/y2_trn_10',Y2)


In [None]:
sess = tf.Session()

In [None]:
'''
# Loading dataset
digits = load_digits()
A, B = digits.data, digits.target

# Data scaling
A = (A / 16).astype(np.float32)
A_train, A_test, B_train, B_test = train_test_split(A, B, test_size=0.2, random_state=0)
classifier0 = SupervisedDBNClassification(hidden_layers_structure=[100, 100, 100],
                                         learning_rate_rbm=0.05,
                                         learning_rate=0.1,
                                         n_epochs_rbm=1,
                                         n_iter_backprop=25,
                                         rbm_batch_size=512,
                                         ann_batch_size=512,
                                         activation_function='sigmoid',
                                         dropout_p=0.2)
classifier0.fit(A_train, B_train)
B_pred = classifier0.predict(A_test)
print('Done.\nAccuracy: %f' % accuracy_score(B_test, B_pred))
classifier0.save('/content/drive/My Drive/GP/dstl/walid/test')
classifier0 = SupervisedDBNClassification.load('/content/drive/My Drive/GP/dstl/walid/test')
B_pred = classifier0.predict(A_test)
print('Done.\nAccuracy: %f' % accuracy_score(B_test, B_pred))
'''

"\n# Loading dataset\ndigits = load_digits()\nA, B = digits.data, digits.target\n\n# Data scaling\nA = (A / 16).astype(np.float32)\nA_train, A_test, B_train, B_test = train_test_split(A, B, test_size=0.2, random_state=0)\nclassifier0 = SupervisedDBNClassification(hidden_layers_structure=[100, 100, 100],\n                                         learning_rate_rbm=0.05,\n                                         learning_rate=0.1,\n                                         n_epochs_rbm=1,\n                                         n_iter_backprop=25,\n                                         rbm_batch_size=512,\n                                         ann_batch_size=512,\n                                         activation_function='sigmoid',\n                                         dropout_p=0.2)\nclassifier0.fit(A_train, B_train)\nB_pred = classifier0.predict(A_test)\nprint('Done.\nAccuracy: %f' % accuracy_score(B_test, B_pred))\nclassifier0.save('/content/drive/My Drive/GP/dstl/walid/t

In [None]:
# Splitting data
X = np.load(('/content/drive/My Drive/GP/dstl/walid/x_whole_10_478_16_flat.npy'))
Y = np.load(('/content/drive/My Drive/GP/dstl/walid/y_whole_10_478_flat.npy'))
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.8, random_state=0)

In [None]:
# Training
classifier = SupervisedDBNClassification(hidden_layers_structure=[100, 100, 100],
                                         learning_rate_rbm=0.05,
                                         learning_rate=0.1,
                                         n_epochs_rbm=3,
                                         n_iter_backprop=100,
                                         rbm_batch_size=256,
                                         ann_batch_size=256,
                                         activation_function='sigmoid',
                                         dropout_p=0.2)



In [22]:
classifier.fit(X_train, Y_train)
classifier.save('/content/drive/My Drive/GP/dstl/walid/model_256batch_256batch_3epochs_20percent')


[START] Pre-training step:
Instructions for updating:
Use `tf.cast` instead.
>> Epoch 1 finished 	RBM Reconstruction error 0.099128
>> Epoch 2 finished 	RBM Reconstruction error 0.094310
>> Epoch 3 finished 	RBM Reconstruction error 0.090825
>> Epoch 1 finished 	RBM Reconstruction error 0.079428
>> Epoch 2 finished 	RBM Reconstruction error 0.051985
>> Epoch 3 finished 	RBM Reconstruction error 0.046091
>> Epoch 1 finished 	RBM Reconstruction error 0.034584
>> Epoch 2 finished 	RBM Reconstruction error 0.019238
>> Epoch 3 finished 	RBM Reconstruction error 0.011648
[END] Pre-training step
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
[START] Fine tuning step:
>> Epoch 0 finished 	ANN training loss 1.370503
>> Epoch 1 finished 	ANN training loss 1.323756
>> Epoch 2 finished 	ANN training loss 1.268703
>> Epoch 3 finished 	ANN training loss 1.240201
>> Epoch 4 finished 	ANN training loss 1.223378
>> Epoch 5 finished 	AN

In [23]:
# Test
Y_pred = classifier.predict(X_test)
print('Done.\nAccuracy with 10 classes: %f' % accuracy_score(Y_test, Y_pred))

Done.
Accuracy with 10 classes: 0.609748


In [24]:
Y_test2 = np.zeros_like(Y_test)
Y_pred2 = np.zeros_like(Y_pred)
for i in range(len(Y_test)):
    Y_test2[i] = Y_test[i]//2
    Y_pred2[i] = Y_pred[i]//2
print('Done.\nAccuracy with 5 classes: %f' % accuracy_score(Y_test2, Y_pred2))

Done.
Accuracy with 5 classes: 0.611361
