In [7]:
#ref: WorkSheet 7 & 8
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
from tqdm import tqdm
import pandas as pd
import re
import csv
rootpath='./'
train_path = './train'
test_path = './test'

In [2]:
#ref: https://www.jianshu.com/p/d787e1bd4481
def pHash(img,leng=32,wid=32):
    img = cv2.imread(os.path.join(rootpath,img),cv2.IMREAD_GRAYSCALE)
    img = cv2.resize(img, (leng, wid)) 
    dct = cv2.dct(np.float32(img))
    dct_roi = dct[0:8, 0:8]           
    avreage = np.mean(dct_roi)
    phash_01 = (dct_roi>avreage)+0
    phash_list = phash_01.reshape(1,-1)[0].tolist()
    phash = ''.join([str(x) for x in phash_list])
    return phash

def Hamming_distance(hash1,hash2):
    num = 0
    for index in range(len(hash1)):
        if hash1[index] != hash2[index]:
            num += 1
    return 1-num/len(hash1)

In [3]:
train_dir = [i for i in os.listdir("train") if re.match(r'(.*?.jpg)',i)]
test_dir = [i for i in os.listdir("test") if re.match(r'(.*?.jpg)',i)]

In [9]:
sift = cv2.SIFT_create()

In [None]:
def hash_all():
    res = {}
    for i in tqdm(train_dir):
        res[i] = pHash(os.path.join(train_path,i))
    return res
hash_values = hash_all()

In [None]:
# input_image: input image name
# num: length of return
def rank_pic_by_phash(input_img,num,hash_values):
    p_test = pHash(os.path.join(test_path,input_img))
    res = {}
    for i in train_dir:
        p_train = hash_values[i]
        res[i] = Hamming_distance(p_test,p_train)
    return sorted(res.items(), key = lambda x:(x[1],x[0]), reverse=True)[:num]

In [None]:
#target = rank_pic_by_phash("IMG5089_1.jpg",1000,hash_values)

In [None]:
def find_top_n(img_path, target,n):
    res = {}
    img = cv2.imread(os.path.join(test_path,img_path),cv2.IMREAD_GRAYSCALE)
    kp1,des1 = sift.detectAndCompute(img,None)
    for i in target:
        target_img = cv2.imread(os.path.join(train_path,i[0]),cv2.IMREAD_GRAYSCALE)
        kp2,des2 = sift.detectAndCompute(target_img,None)
        # FLANN parameters and initialize
        FLANN_INDEX_KDTREE = 1
        index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
        search_params = dict(checks=50)   # or pass empty dictionary
        flann = cv2.FlannBasedMatcher(index_params,search_params)
        if(len(kp1)>=2 and len(kp2)>=2):
            # Matching descriptor using KNN algorithm
            matches = flann.knnMatch(des1, des2, k=2) 
            # Create a mask to draw all good matches
            matchesMask = []
            # Store all good matches as per Lowe's Ratio test.
            good = []
            for m,n in matches:
                if m.distance < 0.7*n.distance:
                    good.append(m)
                    matchesMask.append([1,0]) # Match
                else:
                    matchesMask.append([0,0]) # Mismatch
            # Print total number of good matches between two images
            res[i[0]] = len(good)/len(matchesMask)
    return sorted(res.items(), key = lambda x:(x[1],x[0]), reverse=True)[:50]

In [None]:
#top_50 = find_top_n("IMG5089_1.jpg",target,50)

In [None]:
#top_50

In [None]:
def find_best(top_n,img_path):
    for i in top_n:
        img = cv2.imread(os.path.join(test_path,img_path),0)
        candidate = cv2.imread(os.path.join(train_path,i[0]),0)
        kp1, des1 = sift.detectAndCompute(img,None)
        kp2, des2 = sift.detectAndCompute(candidate,None)
        # FLANN parameters and initialize
        FLANN_INDEX_KDTREE = 1
        index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
        search_params = dict(checks=50)   # or pass empty dictionary
        flann = cv2.FlannBasedMatcher(index_params,search_params)
        # Matching descriptor using KNN algorithm
        matches = flann.knnMatch(des1,des2,k=2)
        # Create a mask to draw all good matches
        matchesMask = []
        # Store all good matches as per Lowe's Ratio test.
        good = []
        for m,n in matches:
            if m.distance < 0.7*n.distance:
                good.append(m)
                matchesMask.append([1,0]) # Match
            else:
                matchesMask.append([0,0]) # Mismatch
        MIN_MATCH_NUM = 4
        if len(good)> MIN_MATCH_NUM:
            ptsA = np.float32([kp1[m.queryIdx].pt for m in good]).reshape(-1,1,2)
            ptsB = np.float32([kp2[m.trainIdx].pt for m in good]).reshape(-1,1,2)
            H, status = cv2.findHomography(ptsA, 
                                           ptsB, 
                                           cv2.RANSAC, 
                                           ransacReprojThreshold = 5, 
                                           maxIters = 10) # try to change maxIters and see the effect
            success = status.ravel().tolist()
            #Threshold to avoid match wall
            if(success.count(1)/len(good) > 0.15):
                return i[0]
    return None


In [None]:
#find_best(top_50,"IMG5089_1.jpg")

In [None]:
train_csv = pd.read_csv("train.csv")
image_names = pd.read_csv("imagenames.csv").values.flatten()
def main_func(hash_num=1000,flann_num=50):
    errors = []
    for test_file in tqdm(image_names):
        test_file+=".jpg"
        pHash_top = rank_pic_by_phash(test_file,hash_num,hash_values)
        flann_top = find_top_n(test_file,pHash_top,flann_num)
        best_match = find_best(flann_top,test_file)
        if best_match:
            best_match = re.sub(".jpg","",best_match)
            x = train_csv.loc[train_csv["id"]==best_match]["x"].values[0]
            y = train_csv.loc[train_csv["id"]==best_match]["y"].values[0]
            f = open('test.csv','a')
            writer = csv.writer(f)
            test_file = re.sub(".jpg","",test_file)
            writer.writerow((test_file,x,y))
            f.close()
        else:
            errors.append(test_file)
            print(test_file)
    return errors

In [None]:
err = main_func(hash_num=1000,flann_num=50)