In [1]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
import cv2 as cv
from PIL import Image
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from os import listdir, path, remove
import os
import zipfile
from time import time
import re
import random
from functools import reduce

pd.set_option('display.max_colwidth',160)

In [2]:
def parseKeypoints(kp):
    return [{'center': p.pt,
           'diameter': p.size,
           'angle': p.angle,
           'class_id': p.class_id,
           'octave': p.octave,
           'response': p.response} for p in kp]

def kpdfsort(kp):
    return pd.DataFrame(parseKeypoints(kp)).sort_values(by=['response'], ascending=False)[['center', 'diameter', 'angle', 'response']]

def orbparams(orb):
    params = dict()
    params['DefaultName'] = orb.getDefaultName()
    params['EdgeThreshold'] = orb.getEdgeThreshold()
    params['FastThreshold'] = orb.getFastThreshold()
    params['FirstLevel'] = orb.getFirstLevel()
    params['MaxFeatures'] = orb.getMaxFeatures()
    params['NLevels'] = orb.getNLevels()
    params['PatchSize'] = orb.getPatchSize()
    params['ScaleFactor'] = orb.getScaleFactor()
    params['ScoreType'] = orb.getScoreType()
    params['WTA_K'] = orb.getWTA_K()

    return params

def byte2hex(bt):
    hx = hex(bt).split('x')[1]
    if bt < 16:
        return '0' + hx
    return hx

# 解决中文路径问题
def cv_imread(file_path):
    root_dir, file_name = os.path.split(file_path)
    pwd = os.getcwd()
    if root_dir:
        os.chdir(root_dir)
    cv_img = cv.imread(file_name)
    os.chdir(pwd)
    return cv_img

class ImFeature:
    def __init__(self, alg=None, k=500):
        if alg == 'sift':
            self.algf = cv.xfeatures2d.SIFT_create()
        elif alg == 'surf':
            self.algf = cv.xfeatures2d.SURF_create()
        else:
            self.algf = cv.ORB_create(k)
        self.alg = alg
        self.matcher = None
        self.flann_matcher = None
        self.store = dict()
    def read(self, img_path):
        if not img_path in self.store:
            store = self.store
            store[img_path] = dict()
            bgr = cv_imread(img_path)
            gray= cv.cvtColor(bgr, cv.COLOR_BGR2GRAY)
            store[img_path]['bgr'] = bgr
            store[img_path]['gray'] = gray
        return self.store[img_path]['bgr'], self.store[img_path]['gray']
    def keypoint(self, im):
        if isinstance(im, str):
            bgr, gray = self.read(im)
            return gray, self.algf.detect(gray, None)
        elif isinstance(im, np.ndarray):
            return im, self.algf.detect(im, None)
        return None, None
    def descriptor(self, img, kp):
        return self.algf.compute(img, kp)
    def fingerprint(self, descriptor):
        return ''.join([''.join([byte2hex(d) for d in dps]) for dps in descriptor])
    def feature(self, im):
        if isinstance(im, str):
            bgr, gray = self.read(im)
            return self.algf.detectAndCompute(gray, None)
        elif isinstance(im, np.ndarray):
            return self.algf.detectAndCompute(im, None)
        return None, None
    def fastFeature(self, im):
        bgr, gray = self.read(im)
        fast = cv.FastFeatureDetector_create()
        kp = fast.detect(gray, None)
        return kp
    def match(self, im1, im2, k=None):
        kp1, des1 = self.feature(im1)
        kp2, des2 = self.feature(im2)
        alg = self.alg
        if self.matcher is None:
            if alg == 'sift':
                self.matcher = cv.BFMatcher()
            elif alg == 'surf':
                self.matcher = cv.BFMatcher()
            else:
                self.matcher = cv.BFMatcher(cv.NORM_HAMMING, crossCheck=True)
        if k is None:
            return self.matcher.match(des1, des2)
        else:
            return self.matcher.knnMatch(des1,des2, k)
    def flannMatch(self, im1, im2):
        kp1, des1 = self.feature(im1)
        kp2, des2 = self.feature(im2)
        alg = self.alg
        if self.flann_matcher is None:
            if alg == 'sift' or alg == 'surf':
                FLANN_INDEX_KDTREE = 1
                index_params = dict(algorithm = FLANN_INDEX_KDTREE, trees = 5)
                search_params = dict(checks=50)
                self.flann_matcher = cv.FlannBasedMatcher(index_params,search_params)
            else:
                FLANN_INDEX_LSH = 6
                index_params= dict(algorithm = FLANN_INDEX_LSH,
                    table_number = 6, # 12
                    key_size = 12,     # 20
                    multi_probe_level = 1) #2
                search_params = dict(checks=50)
                self.flann_matcher = cv.FlannBasedMatcher(index_params,search_params)
        if alg == 'sift' or alg == 'surf':
            return self.flann_matcher.knnMatch(des1,des2,k=2)
        else:
            return self.flann_matcher.match(des1, des2)

class ImSim:
    def __init__(self, k=500):
        self.k = k
        self.feature = ImFeature(k=k)
    def match(self, img1, img2):
        matches = self.feature.flannMatch(img1, img2)
        return sorted([match for match in matches if match.distance < 10], key=lambda x:x.distance)
    def calcSim(self, img1, img2):
        matches = self.match(img1, img2)
        return len(matches)/self.k

In [3]:
class DataSet():
    def __init__(self, root='.'):
        def findimgs(root):
            isimg = lambda f: re.search(r'(.jpg|.jpeg|.png)$', f) is not None
            images = [path.join(root, d) for d in listdir(root) if path.isfile(path.join(root, d)) and isimg(path.join(root, d))]
            dirs = [path.join(root, d) for d in listdir(root) if path.isdir(path.join(root, d))]
            if len(dirs) == 0:
                return images
            for d in dirs:
                images += findimgs(d)
            return images
        self.allimgs = findimgs(root)
    def sort(self, label=None):
        allimgs = self.allimgs
        if not isinstance(label, list): return sorted(allimgs, key=lambda x: int(path.basename(x).split('.')[0]))
        def key(x):
            num = int(path.basename(x).split('.')[0])
            for i, w in enumerate(label):
                if w in x:
                    num += i * 1000
            return num
        return sorted(allimgs, key=key)
    def group(self, label):
        imgs = self.sort(label)
        gd = dict()
        for item in label:
            if item not in gd:
                gd[item] = list()
            for img in imgs:
                if item in img:
                    gd[item].append(img)
        return gd
    def df(self, label):
        return pd.DataFrame(self.group(label))

def unzipimgs(root):
    file_list = listdir(root)
    for file_name in file_list:
        name, ext = path.splitext(file_name)
        if ext == '.zip':
            print(name)
            file_zip = zipfile.ZipFile(path.join(root, file_name), 'r')
            for file in file_zip.namelist():
                file_zip.extract(file, root)
            file_zip.close()
            remove(path.join(root, file_name))

def randpick(dataset, n=10):
    k = 0
    ds = list()
    slen = len(dataset)
    while k < n:
        idx = random.randint(0, slen-1)
        if idx not in ds:
            ds.append(idx)
            k += 1
    return ds, dataset.loc[ds]

def randpickoneimg():
    pickdf = randpick(df2, 1)[1]
    pickdata = [(list(pickdf[col])[0]) for col in list(pickdf.columns)]
    print('{} \t {}'.format(pickdata[0], pickdata[-1]))
    return np.asarray(Image.open(pickdata[0])), pickdata

In [23]:
# for i in range(3, 11):
# file_suffix = '110000-113891'

# fp = pd.read_csv('fingerprint_{}.csv'.format(file_suffix))

# fp_df = pd.DataFrame({ 'fingerprint_long': fp['0'] })

# fp_df.to_csv('fingerprint_{}.csv'.format(file_suffix), index=False)

In [45]:
file_suffix = '100000-110000'
df1 = pd.read_csv('fingerprint_{}.csv'.format(file_suffix))

file_suffix = '110000-113891'
df2 = pd.read_csv('fingerprint_{}.csv'.format(file_suffix))

len(pd.DataFrame(list(df1['fingerprint_long'].append(df2['fingerprint_long'])), columns=['fingerprint_long']))

len(df2)

10000

In [42]:
df1 = pd.DataFrame(dict(a=[1, 2, 3]))
df2 = pd.DataFrame(dict(a=[4, 5, 6]))

pd.DataFrame(list(df1['a'].append(df2['a'])), columns=['a'])

Unnamed: 0,a
0,1
1,2
2,3
3,4
4,5
5,6


#### 长指纹提取

In [None]:
df = pd.read_csv('train_set.csv')
imf = ImFeature(k=50)

fingerprints = list()

time_count = 100
time_tracker = list()

batch_num = 0
batch_size = 10000

img_paths = df['path']
last_idx = len(img_paths) - 1

start = 110000

for i, img_path in enumerate(img_paths):
    if i > 0 and i % batch_size == 0:
        batch_num += 1
    if i < start:
        continue
    if i == start :
        start_time = time()
    elif i % time_count == 0:
        time_tracker.append((i, time() - start_time))
        print(*time_tracker[-1])
        start_time = time()

    if i > start and i % batch_size == 0:
        pd.DataFrame(fingerprints, index=range((batch_num-1)*batch_size, batch_num*batch_size)).to_csv('fingerprint_{}-{}.csv'.format((batch_num-1)*batch_size,  batch_num*batch_size))
        fingerprints = list()
        print('save batch to csv, cost {} s/10000pcs!'.format(pd.DataFrame(time_tracker, columns=['index', 'cost'])['cost'].sum()))
        break

    try:
        img, kps = imf.keypoint(img_path)
        descriptor = imf.descriptor(img, kps)[1]
        fingerprint = imf.fingerprint(descriptor)
    except:
        print('excend memory size!')
        break

    if i == last_idx:
        time_tracker.append((i+1, time() - start_time))
        print(*time_tracker[-1])
        fingerprints.append(fingerprint)
        pd.DataFrame(fingerprints, index=range(batch_num*batch_size, batch_num*batch_size+((i+1)%batch_size))).to_csv('fingerprint_{}-{}.csv'.format(batch_num*batch_size, batch_num*batch_size+((i+1)%batch_size)))
        print('complete!')
        break

    fingerprints.append(fingerprint)

In [None]:
# mean_cout_per100 = pd.DataFrame(time_tracker, columns=['index', 'cost'])['cost'].mean()
# mean_cout_per100
# unzipimgs('images/train')
# dataset = DataSet('images/train')
# df = dataset.df(['餐厅_北欧极简', 'filter', 'rotate', 'crop'])
# df

# rtype, rstyle = dataset.allimgs[0].replace('\\', '/').split('/')[2].split('_')

# rtype, rstyle

# paths = list()
# types = list()
# styles = list()

# for img in dataset.allimgs:
#     paths.append(img.replace('\\', '/'))
#     rtype, rstyle = paths[-1].split('/')[2].split('_')
#     types.append(rtype)
#     styles.append(rstyle)

# df = pd.DataFrame({'path': paths, 'type': types, 'style': styles})

# df.to_csv('train_set.csv', index=False)

pd.DataFrame(['asdf', 'asddasssd'], index=range(2))

In [None]:
img, pickdata = randpickoneimg()
plt.imshow(img)

img_path = pickdata[0]

imf = ImFeature(k=50)
img, kp = imf.keypoint(img_path)

imf.descriptor(img, kp)[1]

In [None]:
# kpdfsort(kp)
pickdf = randpick(df2, 2)[1]

img1, img2 = list(pickdf['path'])

imsim = ImSim(k=50)
imsim.calcSim(img1, img2)

In [None]:
img, kp = imf.keypoint(img1)

imf.descriptor(img, kp)[1].shape

In [None]:
kps = kpdfsort(kp)
kps['center'] = [(round(x), round(y)) for (x, y) in kps['center']]
kps['diameter'] = [round(d) for d in kps['diameter']]

In [None]:
img = cv_imread(img1)

In [None]:
img_resize = cv.resize(img,(200,200),interpolation=cv.INTER_CUBIC)
plt.imshow(img_resize)

In [None]:
kpdf = kpdfsort(imf.keypoint(img_resize)[1])

kps['center'] = [(round(x), round(y)) for (x, y) in kps['center']]