**This notebook is for testing purposes only, refer to *Trainer.ipynb* to see the full code with visualization, model testing and previous trials.**



# Essentials

In [None]:
from google.colab import drive # Don't run this cell if you're not using colab
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#All needed imports
import cv2
import numpy as np
import pandas as pd
from itertools import product
from skimage.filters import sobel
from skimage import feature
import pathlib
#models
from sklearn.svm import SVC
#testing
import os
import time

**Note: Change PATH_TO_TEST_FOLDER and PATH_TO_TEST_DATA as described below**

In [None]:
#constants
MASK_CC_THRESHOLD=100
LINES_PEAKS_DIVIDER=3
#paths
PATH_TO_TEST_FOLDER = "/content/drive/MyDrive/Pattern Project/Test Set" # Path to folder that will have results.txt, time.txt and data folder
PATH_TO_TEST_DATA = PATH_TO_TEST_FOLDER + "/data" # Data folder

# Preproccessing

In [None]:
def preProcessTheImage(image):
    image = cv2.GaussianBlur(image, (5, 5), 0)[:, 150:-50] # Blur the image and crop some white space from both sides to fasten the process 
    thresh, bin_img = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) #get the binary image
    contours, _ = cv2.findContours(bin_img, cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) #get the contours of the binary image
    height, width = image.shape
    top, bottom, left, right = 0, height - 1, 0, width - 1
    
    countoursBoundingBox = np.asarray([np.asarray(cv2.boundingRect(contour)) for contour in contours])
    countoursBoundingBox = countoursBoundingBox[countoursBoundingBox[:,2] > 1000]
    cutMask = countoursBoundingBox[:,1] < height // 2
    upperHalfCountours = countoursBoundingBox[cutMask]
    lowerHalfCountours = countoursBoundingBox[np.logical_not(cutMask)]
    top = np.max(upperHalfCountours[:,1]+upperHalfCountours[:,3]+10)
    bottom = np.min(lowerHalfCountours[:,1]-10)

    noiselessImage = cv2.erode(bin_img, np.ones((3, 3), np.uint8), iterations=2) #erode the image to get rid of any noise
    horizontalHistogram = np.sum(noiselessImage, axis=1)[top:bottom+1] 
    verticalHistogram = np.sum(noiselessImage, axis=0) 
    left+=(verticalHistogram!=0).argmax()
    right-=(verticalHistogram[::-1]!=0).argmax()
    top+=(horizontalHistogram!=0).argmax()
    bottom-=(horizontalHistogram[::-1]!=0).argmax()
    return image[top:bottom + 1, left:right + 1],bin_img[top:bottom + 1, left:right + 1] #preprocessed image

# Segmentation

In [None]:
def getConnectedComponents(binaryImage): #only returns the centroids, stats, and the visual image of the needed components 
#--------------------------------------------------------------------------------------------------------------------------#
#cv2.CC_STAT_LEFT The leftmost (x) coordinate which is the inclusive start of the bounding box in the horizontal direction.
#cv2.CC_STAT_TOP The topmost (y) coordinate which is the inclusive start of the bounding box in the vertical direction.
#cv2.CC_STAT_WIDTH The horizontal size of the bounding box
#cv2.CC_STAT_HEIGHT The vertical size of the bounding box
#cv2.CC_STAT_AREA The total area (in pixels) of the connected component
#--------------------------------------------------------------------------------------------------------------------------#
  nb_components, cc_output, stats, centroids  = cv2.connectedComponentsWithStats(binaryImage, connectivity=8)
  mask = np.where(stats[:,-1] > MASK_CC_THRESHOLD)[0] #get rid of all the commas and periods. The MASK_CC_THRESHOLD value could be change later on.
  return centroids[mask],stats[mask],cc_output

In [None]:
# Get the horzinotal projection of sobel image
def getHorizontalHist(binaryImage):
  sobelImg = sobel(binaryImage)
  return np.sum(sobelImg,axis=1)

In [None]:
#Take the top peaks
def getLinesPeaks(horizontalHist,binaryImage):
  threshold = (np.max(horizontalHist) - np.min(horizontalHist)) / LINES_PEAKS_DIVIDER
  peaks_index = np.where(horizontalHist > threshold)[0]
  linesImage = binaryImage.copy()
  linesImage[peaks_index,:] = 0
  return peaks_index

In [None]:
def consecutive(data, stepsize=1):
    return np.split(data, np.where(np.diff(data) != stepsize)[0]+1)

In [None]:
def getLinesBoundaries(binaryImage): #to know the limits of each line
  horizontalHist=getHorizontalHist(binaryImage) 
  peaks_index=getLinesPeaks(horizontalHist,binaryImage)
  hpClusters = consecutive(peaks_index)

  threshold = -1
  for i in range(len(hpClusters)):
    value = len(hpClusters[i])
    if threshold < value:
      threshold = value

  threshold /= 2
  minArray = []
  maxArray = []

  for i in range(len(hpClusters)):
    if len(hpClusters[i]) >= threshold:
      minArray.append(hpClusters[i][0])
      maxArray.append(hpClusters[i][-1])

  lines=[0]
  avg = 0
  for i in range(len(maxArray)-1):
    diff = (minArray[i+1] - maxArray[i])/2
    lines.append( int(maxArray[i] + diff))
    avg += diff

  if len(maxArray) > 1:
    lines.append(int(maxArray[-1] + avg/(len(maxArray)-1)))
  else:
    lines.append(int(maxArray[-1] + avg))

  return lines

In [None]:
def createTextureImage(greyImage,binaryImage): #coverts the text image to texture
  lines=getLinesBoundaries(binaryImage) #get the seperating lines

  centroids,stats,cc_output =getConnectedComponents(binaryImage) #get the connected components
  textureImage = np.full(binaryImage.shape,255)
  y_current = 0
  currentRange = 0
  y_avg = 0
  y_sum = 0
  y_count = 0

  y_sum_prev=0
  y_count_prev = 0
  y_prev = 0
  
  x_start = stats[1,0]
  x_end = 0
  for i in range(1,len(centroids)):
    
    if currentRange+1 == len(lines):
      break
    if lines[currentRange+1] > centroids[i,1] :
      if not (lines[currentRange] < centroids[i,1]):
          currentRange -= 1
          y_avg = y_sum_prev / y_count_prev
          y_sum = y_sum_prev
          y_count = y_count_prev
    else:
      y_avg = y_sum / y_count
      y_avg_prev = y_avg
      y_prev = y_current
      y_current += int(y_avg/2)
      currentRange += 1
      y_sum_prev=y_sum
      y_count_prev = y_count
      y_sum =0
      y_count =0

      
    if stats[i,0]< x_start:
        x_start = stats[i,0]
    if stats[i-1,0] > x_end:
      x_end = stats[i-1,0] + stats[i-1,2]  
  

    toBeCopied = greyImage[stats[i,1]: stats[i,1] + stats[i,3],stats[i,0] :stats[i,0] + stats[i,2]]
    toBeCopiedMask = toBeCopied <= 200
    textureImage[y_current:y_current+ stats[i,3],stats[i,0] :stats[i,0] + stats[i,2]][toBeCopiedMask] = toBeCopied[toBeCopiedMask]
    y_sum += stats[i,3]
    y_count += 1

  return textureImage[:int(y_current+y_avg*1.5),int(x_start):int(x_end)]

In [None]:
def divideTextureImageIntoBlocks(textureImage,divide=3): #takes in a numpy textue mage
    h, w = textureImage.shape
    h_new= h - (h % divide)
    w_new= w - (w % divide) 
    textureImage=textureImage[0:h_new, 0:w_new]
    segment1=h_new//divide
    segment2=w_new//divide
    return (textureImage.reshape(h_new//segment1, segment1, -1, segment2)
               .swapaxes(1,2)
               .reshape(-1, segment1, segment2))

In [None]:
def convertImagesIntoTextureBlocks(dataset,authorMap,exp_beta=30):
  textureBlocks=[]
  currentPath = ''
  labels = []
  for path,author in dataset.values:
    
    if path[0] <= 'd':
      currentPath = PATH_TO_A_D
    elif path[0] <= 'h':
      currentPath = PATH_TO_E_H
    else:
      currentPath = PATH_TO_I_Z
    greyImage = cv2.imread(currentPath+path,cv2.IMREAD_GRAYSCALE)

    gImg,bImg = preProcessTheImage(greyImage)


    labels+=list(np.repeat(author, 9))
    textureBlocks += list(divideTextureImageIntoBlocks(createTextureImage(gImg,bImg)))
    if np.random.exponential(authorMap[author]/exp_beta) <= 0.5:
      seq = iaa.Sequential([
       iaa.Crop(px=(np.min(gImg.shape)//5, np.min(gImg.shape)//3)), 
      ])
      
      random_gImg = seq(images=gImg)
      if random_gImg.shape[0] < 600:
        continue
      thresh, random_bImg = cv2.threshold(random_gImg, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) #get the binary image
      textureBlocks += list(divideTextureImageIntoBlocks(createTextureImage(random_gImg,random_bImg)))
      labels+=list(np.repeat(author, 9))
      authorMap[author] +=1
  return textureBlocks,labels

# Feature Extraction

In [None]:
def getFeaturesLBP(textureBlock, sampling_pixels=8,radius=3):
    # normalize the values of the grey scale image
    i_min = np.min(textureBlock)
    i_max = np.max(textureBlock)
    if (i_max - i_min != 0):
        textureBlock = (textureBlock - i_min)/(i_max-i_min)
    
    # compute LBP
    lbp = feature.local_binary_pattern(textureBlock, sampling_pixels, radius)
    
    # compute the histogram of the matrix obtained from LBP to get 256 features
    (hist, _) = np.histogram(lbp.ravel(),bins=256)

    # normalization might or might not be required, if it is uncomment these two lines
    hist = hist.astype(np.float)
    #hist /= np.sum(hist)
    return hist

In [None]:
def featuresOfMultipleTextureBlocksLBP(textureBlocks,radius=3): #takes in a python list of texture blocks
  features=[]
  for i in range(len(textureBlocks)):
    features.append(getFeaturesLBP(textureBlocks[i],radius=radius))
  return np.asarray(features)

# Testing

In [None]:
def wholePipeline(imagePath,author,radius=3,divide=3):
  return featuresOfMultipleTextureBlocksLBP(
      divideTextureImageIntoBlocks(
          createTextureImage(
              *preProcessTheImage(cv2.imread(str(imagePath),cv2.IMREAD_GRAYSCALE))),divide=divide),
              radius=radius),np.repeat(author,divide*divide) #returns features(9,256) and labels 9

In [None]:
def wholePipelineForTest(imagePath,radius=3,divide=3):
  return featuresOfMultipleTextureBlocksLBP(
      divideTextureImageIntoBlocks(
          createTextureImage(
              *preProcessTheImage(cv2.imread(str(imagePath),cv2.IMREAD_GRAYSCALE))),divide=divide),
              radius=radius) #returns features(9,256)

In [None]:
def testTheModel(train,train_labels,test,radius=3,divide=3,C=5.0):
  classifier = SVC(C=C, gamma='auto', probability=True)
  numberOfBlocks=divide*divide
  allFeatures = np.zeros((numberOfBlocks*6,256),dtype=float)
  allLabels = np.zeros(numberOfBlocks*6,dtype=int)

  for i in range(len(train)):
    features,labels=wholePipeline(train[i],train_labels[i],radius=radius,divide=divide)
    allFeatures[i*numberOfBlocks:i*numberOfBlocks+numberOfBlocks,:]=features
    allLabels[i*numberOfBlocks:i*numberOfBlocks+numberOfBlocks]=labels
  
  maxColWise=np.max(allFeatures,axis=0)
  allFeatures/=maxColWise #normalize the features column wise by dividing by the max of each col
  classifier.fit(allFeatures, allLabels)
  
  testFeatures = wholePipelineForTest(test,radius=radius,divide=divide)
  testFeatures/=maxColWise
  prediction= np.bincount(classifier.predict(testFeatures)).argmax()

  return prediction

# Reading Data and Writing to files

In [None]:
trainImages = []
labels = []
testImages = []

test_cases = os.listdir(PATH_TO_TEST_DATA)
test_cases.sort()
for sample in test_cases: #Sample folder level
    for author in pathlib.Path(PATH_TO_TEST_DATA+'/'+sample).iterdir(): #Author in sample folder level
      if author.is_file():
        testImages.append(str(author))
      else:
        for image in pathlib.Path(author).iterdir(): #Image for author in sample folder level
          labels.append(int(str(author)[-1]))
          if not author.is_file():
            trainImages.append(str(image))


results = open(PATH_TO_TEST_FOLDER+'/'+"results.txt","w")
time_file = open(PATH_TO_TEST_FOLDER+'/'+"time.txt","w")
counter = 0
for i in range(0,len(trainImages),6):
  start = time.perf_counter() #start time
  answer = testTheModel(trainImages[i:i+6],labels[i:i+6],testImages[counter],C=8.0) #Pipeline of the 7 images
  end = time.perf_counter() #end time

  results.write(str(answer)+'\n') #Writing results
  time_file.write(str(np.round(end-start,2))+'\n') #Writing results
  counter+= 1

results.close()
time_file.close()