In [5]:
import numpy as np
import pandas as pd
import cv2 as cv
from sklearn.utils import shuffle
from tensorflow.keras import backend as K
from tensorflow.keras.models import load_model

In [25]:
# remane connections to handLandmarkConnections
connections = [
    (0, 1), (1, 2), (2, 3), (3, 4),
    (5, 6), (6, 7), (7, 8),
    (9, 10), (10, 11), (11, 12),
    (13, 14), (14, 15), (15, 16),
    (17, 18), (18, 19), (19, 20),
    (0, 5), (5, 9), (9, 13), (13, 17), (0, 17)
]
   
# Add Gaussian noise to sample
def addNoise(samplesPerLm, landmarks, noiseLimit):
    if type(landmarks) == list:
        landmarks = np.asarray(landmarks)
        
    assert len(landmarks.shape) == 3, "[addNoise]: addNoise landmarks shape not as per required, got {}".format(landmarks.shape)
    noisyLm = []
    if(len(landmarks.shape) == 2):
        landmarks=landmarks.expand_dims(axis=0)
    
    for landmark in landmarks:
        for _ in range(samplesPerLm):
            lm = []
            for coord in landmark:
                lm.append([coord[0]+np.random.randint(-noiseLimit,noiseLimit)+np.random.random(),coord[1]+np.random.randint(-noiseLimit,noiseLimit)+np.random.random()])
            noisyLm.append(lm)
    newNoisyLm = np.asarray(noisyLm)
    return newNoisyLm

def addNoiseSingle(landmarks, noiseLimit):
    if type(landmarks) == list:
        landmarks = np.asarray(landmarks)
        
    assert len(landmarks.shape) == 3, "[addNoiseSingle]: addNoise landmarks shape not as per required, got {}".format(landmarks.shape)
    noisyLm = []
    if(len(landmarks.shape) == 2 and landmarks.shape==(21,2)):
        landmarks=landmarks.expand_dims(axis=0)
    
    for landmark in landmarks:
        lm=[]
        for coord in landmark:
            lm.append([coord[0]+np.random.randint(-noiseLimit,noiseLimit)+np.random.random(),coord[1]+np.random.randint(-noiseLimit,noiseLimit)+np.random.random()])
        noisyLm.append(lm)
    newNoisyLm = np.asarray(noisyLm)
    return newNoisyLm

# rotate coords i.e pair of two floating points by @rad radians    
def rotate(coords, rad, point=None):
#     cx and cy are center of landmark
    assert coords.shape == (21,2), "coords isn't 2D"
    if point is None:
        coords = coords.astype(np.float32)
        x,y = min(coords[:,0]), min(coords[:,1])
        w,h = max(coords[:,0]) - x, max(coords[:,1])-y
        cx, cy = x+(w/2), y+(h/2)
    else:
        cx, cy = point
    
    sin = np.sin(rad)
    cos = np.cos(rad)
    n = []
    for point in coords:
        x = cx+((point[0]-cx)*cos) - ((point[1]-cy)*sin)
        y = cy+((point[0]-cx)*sin) + ((point[1]-cy)*cos)
        n.append([x,y])

    return np.asarray(n)

def rotateAll(landmarks, rad, point=None):
#     Basic checks for landmarks
    if type(landmarks) == list:
        landmarks = np.asarray(landmarks)
    
    shape = landmarks.shape
    if shape == (21,2):
        landmarks = np.expand_dims(landmarks,axis=0)
        shape = landmarks.shape
        
    assert type(landmarks) == np.ndarray, "Got incompetable type for landmarks: " + str(type(landmarks))
    assert (shape[-2:] == (21,2) and len(shape) == 3) or (shape[-2:] == (42) and len(shape) == 2), "Got landmarks with incompetable shape {}".format(shape)
    
    if shape[-1] == 42 and len(shape) == 2:
        landmarks = landmarks.reshape((-1,21,2))
        print("[rotateAll]: Reshaping landmarks to (21,2)")
    
    rotatedLms = []
    i = 0
    
    if type(rad) is np.ndarray:        
        for landmark,r in zip(landmarks,rad):
#             print(landmark.shape)
            rotatedLms.append(rotate(landmark, r, point))
            i+=1
    elif np.shape(rad) == ():
        for landmark in landmarks:
            rotatedLms.append(rotate(landmark, rad, point))
    else:
        raise TypeError("Parameter rad is nither array nor single value")
#     print(np.shape(rotatedLms))
    return np.asarray(rotatedLms)
        
def generateSamples(gesLandmarks, samplesPerLm, rotate_range=7, noise_threshold=2, preprocess=False):
    oneRad = np.pi / 180
    compatibleTypes = [str,list,np.ndarray]
    assert type(gesLandmarks) in compatibleTypes,"[generateSamples]: pass either path to landmarks of landmarks nothing else"
    
#     if path is given
    if(type(gesLandmarks) == str):
        x = pd.read_csv(gesLandmarks).to_numpy().astype(np.float32)
#     if list was given
    elif(type(gesLandmarks) == list):

        x = np.array(gesLandmarks)
#     if ndarray is given
    else:
        x = gesLandmarks
        
    shape = x.shape
#     print(shape[-1])
    if(shape[-1] == 1):
        x = x.reshape((-1,21,2))
#     print(x.shape)

    rotatedLm = []
    
    for lm in x:
        degree = -rotate_range
        for i in range(2*rotate_range):
            rotatedLm.append(rotate(lm,degree*oneRad))
            degree+=1
    rotatedLm = np.array(rotatedLm)
#     print(rotatedLm.shape)
#     showLm(x[0]) 

#     To compensate the additional samples generated in rotating landmarks
    samplesPerLm = int(samplesPerLm / (rotate_range*2)) + 1
    noisyLm = addNoise(samplesPerLm, rotatedLm, noise_threshold)
#     showAllLm(noisyLm)
    labels = np.zeros(noisyLm.shape[0],dtype=np.int16)
    
    samplesPerGes = int(noisyLm.shape[0]/x.shape[0])*2
    print("samplesPerGes"+str(samplesPerGes))
    uniqueGes = x.shape[0]
    print("uniqueGes"+str(uniqueGes))
    for i in range(uniqueGes):
        labels[i*samplesPerGes:(i+1)*samplesPerGes] = i
    print("Samples generated has shape " , noisyLm.shape, len(labels))
    if preprocess:
        noisyLm = preprocessLm(noisyLm, normalize=True,trainData=True)
    return noisyLm, labels.flatten()

# adjust landmark so that one x and one y value is 0 and also the resize landmark to certain ratio
def preprocessLm(landmarks, ratio=0.3, normalize=False, maxdim=(480,640), trainData=False):
    if(type(landmarks) == list):
        landmarks = np.asarray(landmarks)
    if(len(landmarks.shape) == 2):
        landmarks = np.asarray([landmarks])
    if(np.shape(landmarks)[-1] == 42):
        landmarks = np.reshape(landmarks, (-1,21,2))
        print("[normalizeLm]: Got landmarks of shape 42. Reshaping to (21,2)")
    
    assert len(landmarks.shape) == 3, "[preprocessLm]: Shape of landmarks array is not 3"
    
    normLm = []
    finalratio = [480*ratio,640*ratio]
    
    noOfLandmarks = (landmarks.shape[-3],)
    
    for landmark in landmarks:
        mn = np.array([min(landmark[:,0]),min(landmark[:,1])])
        mx = np.array([max(landmark[:,0]),max(landmark[:,1])])
        mul = finalratio/(mx-mn)
        tempLm = (landmark-mn)*mul
        normLm.append(tempLm)
    if normalize:
        normLm=normalizeLm(normLm, maxdim)        
    if trainData:
        normLm=normLm.reshape((-1,42))
    return np.asarray(normLm)

def perprocessLm2(landmarks,noiseRange,rads):
    if normalize:
        return (AddNoiseSingle(rotateAll(landmarks,rads), noiseRange))
    else:
        return normalizeLm((AddNoiseSingle(rotateAll(landmarks,rads), noiseRange)))

#generate N random samples from given landmarks
def getNLms(lms,n,normalized=False, ratio=0.3):
    assert len(lms) >= n, "[getNLms]: N can't be larger than total samples"
    if(normalized):
        return preprocessLm(np.asarray(lms[np.random.choice(len(lms),n)]),ratio)
    else:
        return np.asarray(lms[np.random.choice(len(lms),n)])
def normalizeLm(landmarks,maxdim=(480,640)):
    assert type(landmarks)==list or type(landmarks)==np.ndarray, "[normalizeLm]: landmarks should be list of numpy array"
    assert len(np.shape(landmarks)) == 3, "[normalizeLm]: landmarks don't have 3 dimensions"
    normLm = []
    for landmark in landmarks:
        normLm.append(landmark/maxdim)
    return np.asarray(normLm)

def mulTuple(tup):
        assert type(tup) == tuple or type(tup) == list,"pass tuple of list"
        a = 1
        for i in tup:
            a*=i
        return a

# show landmarks on blackbackground Format of landmarks is array of 2D vector of size 21
def impersonateLm(final, data):
    for i in range(21):  
        final = cv.circle(final, (int(data[i][0]),int(data[i][1])), 3, (0,0,255), 3)
    for connection in connections:
        x0, y0 = data[connection[0]]
        x1, y1 = data[connection[1]]
        cv.line(final, (int(x0), int(y0)), (int(x1), int(y1)), (255,255,0), 2)
    return final

#Shows a single Landmark and wait for key press DestroysWindow if destroyWindow is true
def showLm(landmark, destroyWindow=False, windowName = "landmark"):
    landmarkShape = (21,2)
    actualShape = landmark.shape
    assert actualShape == landmarkShape, "[showLm]: Landmark shape isn't (21,2)"
    blackBack = np.zeros((480,640,3))
    cv.imshow(windowName,impersonateLm(blackBack, landmark))
    if(destroyWindow):
        cv.destoryAllWindows()
        
# show all the landmarks passed in as first arg
def showAllLm(landmarks):
    if(type(landmarks) == list):
        landmarks=np.asarray(landmarks)

    if(len(landmarks.shape) == 3):
        for landmark in landmarks:
            showLm(landmark)
            cv.waitKey(0)
    else:
        showLm(landmarks)
        cv.waitKey(0)
    cv.destroyAllWindows()

# Shows the pair of landmarks side by side in seperate window (pass array of (N,2,21,2))
def showLmPairs(pairs, winName=["first","second"]):
    assert type(pairs) == list or type(pairs) == np.ndarray,"[showLmPairs]: pass either list or ndarray"
    if type(pairs) == list:
        pairs  = np.asarray(pairs)
    x = 1
#     for s in pairs.shape:
    assert len(pairs.shape) == 4, "[showPairs]: pairs not in proper shape"
    if pairs.shape[-3:] == (2,21,2):
        for lmPair in pairs:
            showLm(lmPair[0],windowName=winName[0])
            showLm(lmPair[1],windowName=winName[1])
            cv.waitKey(0)
    elif pairs.shape[0] == 2 and pairs.shape[-2:] == (21,2):
        for i in range(pairs.shape[1]):
            showLm(pairs[0][i],windowName=winName[0])
            showLm(pairs[1][i],windowName=winName[1])
            cv.waitKey(0)
    cv.destroyAllWindows()

def showLandmark(landmarks):
    if type(landmarks) == str:
        import pandas as pd
        landmarks = pd.read_csv(landmarks).to_numpy().reshape(-1,21,2)
    
    shape = np.shape(landmarks)
    lmShape = (21,2)
    lmPairShape = (2,21,2)
    trainLmShape = (42)
    
    if shape[-1] == 42:
        newShape = shape[:-1]+lmShape
        landmarks = np.reshape(landmarks,newShape)
        shape = newShape

    if shape == lmShape:
        landmarks = np.expand_dims(landmarks,axis=0)
        shape = landmarks.shape
        print(landmarks.shape)
        
    if mulTuple(np.shape(landmarks[0][0])) > np.sum(landmarks[0][0]):
        print("[showLandmark]: Noramlized Landmarks are Passed")
        landmarks=np.multiply(landmarks,(480,640))
    assert (len(shape) == 3 or len(shape) == 4) and shape[-2:]==lmShape,"[showLandmark]: pass either landmarks or pair of landmarks. {} isn't valid".format(shape)
    if type(landmarks) == list:
        landmarks = np.asarray(landmarks)
    print("[showLandmark] {}".format(landmarks.shape))
    
    if len(shape) == 4:
#         if shape[-3:]==lmPairShape:
        print("Array of Pairs")
        showLmPairs(landmarks)

    elif len(shape) == 3 and shape[-2:]==lmShape:
        print("Array")
        showAllLm(landmarks)
        
def imposeLmPairs(pairs,windowName="ImposedLandmarks"):
    shape = np.shape(pairs)
    lmPairShape = (2,21,2)
    trainLmShape = (42)
    
    if shape[-1] == 42:
        newShape = shape[:-1]+lmShape
        landmarks = np.reshape(pairs,newShape)
        shape = newShape
    
    if shape == lmPairShape:
        landmarks = np.expand_dims(pairs,axis=0)
        shape = pairs.shape
        print(pairs.shape)
    
    if mulTuple(np.shape(pairs[0][0])) > np.sum(pairs[0][0][0]):
        print("[imposeLmPairs]: Noramlized Landmarks are Passed")
        landmarks=np.multiply(landmarks,(480,640))
    
    if shape[0] == 2 and shape[-2:]==lmShape:
        lmset1 = np.expand_dims(pairs[0][:],axis=1)
        lmset2 = np.expand_dims(pairs[1][:],axis=1)
        pairs = np.append(lmset1,lmset2, axis=1)
        shape = pairs.shape
        print("[imposeLmPairs]: (2,N,21,2) converted to (N,2,21,2)")
    
    assert len(shape) == 4 and shape[-3:]==lmPairShape,"[imposeLmPairs]: pass pairs of landmarks. {} isn't valid".format(shape)
    
    output = []
    
    for lmPair in pairs:
        temp = np.zeros((480,640,3))
        temp=impersonateLm(temp, lmPair[0])
        temp=impersonateLm(temp, lmPair[1])
        output.append(temp)
    
    for lm in output:
        cv.imshow(windowName,lm)
        cv.waitKey(0)
    cv.destroyAllWindows()
    
    
def getLmFromCsv(path,feedModel=True):
    import pandas as pd
    return np.expand_dims(preprocessLm(pd.read_csv(path).to_numpy(), normalize=feedModel, trainData=feedModel), axis=1)

In [8]:
import numpy as np
import pandas as pd

class pairGenerator:
    
    def __init__(self,path="garbage/finalFeedGes.csv",noOfGes=9,relocate=True,normalize=False):
        self.landmarks = pd.read_csv(path).to_numpy().reshape((-1,21,2))
        self.samplesPerGes = int(self.landmarks.shape[0]/noOfGes)
        print(self.samplesPerGes)
        self.noOfGes = noOfGes
        if relocate:
            if normalize:
                self.landmarks = preprocessLm(self.landmarks,normalize=True)
            else:
                self.landmarks = preprocessLm(self.landmarks)
        print("Landmarks is loaded from csv file "+path+" with shape of array " + str(self.landmarks.shape))
        
    def generatePairs(self,landmarks=None,batchSize=32,trainData=False):
        if landmarks is None:
            landmarks = self.landmarks
        assert len(landmarks.shape)==3 and (type(landmarks)==list or type(landmarks)==np.ndarray),"[generatePairs]: Either shape isn't 3 Dim or landmarks is not list or numpy array"
        if type(landmarks)==list:
            landmarks = np.asarry(landmarks)
        labels = np.zeros((batchSize,), dtype=np.int16)
        labels[batchSize//2:]=1
        outputShape = (batchSize,)+landmarks.shape[1:]
        gesPairs = [np.zeros(outputShape) for _ in range(2)]
        samplePerGes = 2000
        for i in range(batchSize):
    #         I know that total 9 unique gestures are there in list
            gesGrpId = np.random.randint(1,10)
            startInd = (gesGrpId-1)*samplePerGes
            offset1 = np.random.randint(1,samplePerGes+1)
            gesInd1 = startInd + offset1 - 1
        #         If label is 1 then the pair are of same image
            if labels[i]:
                offset2 = (np.random.randint(0,samplePerGes) + offset1) % offset1
                gesInd2 = startInd + offset2
        #         Label 0 indicates different image pairs
            else:
                gesGrpId2 = (np.random.randint(1,10) + gesGrpId)%gesGrpId
                offset = np.random.choice(samplePerGes)
                gesInd2 = (gesGrpId2-1)*samplePerGes + offset
            gesPairs[0][i,:,:]=landmarks[gesInd1]
            gesPairs[1][i,:,:]=landmarks[gesInd2]
        if trainData:
            gesPairs[0] = gesPairs[0].reshape((batchSize,42))
            gesPairs[1] = gesPairs[1].reshape((batchSize,42))
        gesPairs[0],gesPairs[1],labels = shuffle(gesPairs[0],gesPairs[1], labels)
        return gesPairs,labels
    
    
    def generator(self,batchSize=32,re=False):
        while True:
            yield self.generatePairs(batchSize,re=re)
            
    def makeOneShotTask(self,N,trainData=False):
        assert N > 0, "[oneShotTask]: N cannot be less than 1"
        cat = np.random.choice(range(self.noOfGes), size=N)
        indices = np.random.choice(range(self.samplesPerGes),size=N,replace=False)
        ex1,ex2 = np.random.choice(range(self.samplesPerGes),size=2,replace=False)
        trueCat = cat[0]
        cat[1:] = (cat[1:]+(trueCat+1))%(trueCat+1)-1
        mainGes = np.asarray([self.landmarks[(trueCat*self.samplesPerGes)+ex1,:,:]]*N)
        comparingSet = self.landmarks[(cat*self.samplesPerGes)+indices,:,:]
        comparingSet[0] = self.landmarks[(trueCat*self.samplesPerGes)+ex2,:,:]
        targets = np.zeros((N),dtype=np.int16)
        targets[0] = 1
        comparingSet, targets = shuffle(comparingSet,targets)
        if trainData:
            comparingSet = comparingSet.reshape((N,42))
            mainGes = mainGes.reshape((N,42))
#             pairs=np.reshape(pairs,(2,N,42))
        pairs=[mainGes,comparingSet]
        return pairs,targets
    
    def evaluate(self,model,N=8,k=50,reshape=True):
        n_correct = 0
        for _ in range(k):
            inputs, labels = self.makeOneShotTask(N,trainData=reshape)
            pred = model.predict(inputs)
#             print(pred)
#             showLandmark(inputs)
            if np.argmax(pred) == np.argmax(labels):
                n_correct += 1
#         total = N*k
#         inputs, labels
        persentage = n_correct/k
        print("Got {}% correct".format(persentage))
        return persentage

In [26]:
class Generator2():
    
    def __init__(self, BasicGesturePath="garbage/GestureProject/RawGestures/RelocatedRawGes10R.csv"):
        import pandas as pd
        %run gesLmUtilsLib.ipynb
#         Lm should always has to be in (N,21,2) shape and reshape to (N,42) whenever needed
        self.rawLm = pd.read_csv(BasicGesturePath).to_numpy().reshape((-1,21,2))
        self.lmForPrediction = self.rawLm.reshape((-1,1,42))
        self.oneRad = np.pi / 180
        self.uniqueGes = self.rawLm.shape[0]
    def show(self):
        showLandmark(self.rawLm)
        
#     Calling from generator will automatically reshape (21,2) into (42)
    def generator(self,BatchSize=64,lmShape=(21,2),rotateRange=5,NoiseRange=5):
        while True:
            yield self.generatePairs(BatchSize,lmShape,rotateRange,NoiseRange,True)
    
    def generatePairs(self,BatchSize,lmShape=(21,2),rotateRange=5,NoiseRange=5,forTraining=False):
        assert type(lmShape) == tuple,"lmShape parameter is expected to be tuple, instead got " + type(lmShape)
        
        firstLm = []
        counterLm = []
        labels = np.zeros(BatchSize,dtype = np.int)
        labels[0:BatchSize//2] = 1
        rotateLmRads = np.random.randint(-rotateRange,rotateRange,(BatchSize*2))*self.oneRad
        
        for i in range(BatchSize):
            gesId = np.random.randint(1,11)
            firstLm.append(self.rawLm[gesId-1])
            if labels[i] == 0:
                counterGesId = (np.random.randint(1,11) + gesId) % gesId
                counterLm.append(self.rawLm[counterGesId-1])
            else:
                counterLm.append(firstLm[i])
        
#         Roatate Landmarks
        firstLm = rotateAll(firstLm,rad=rotateLmRads[0:BatchSize])
        counterLm = rotateAll(counterLm,rad=rotateLmRads[BatchSize:])
        
#         Add Noise
        firstLm = addNoiseSingle(firstLm,NoiseRange)
        counterLm = addNoiseSingle(counterLm,NoiseRange)
        
#         Shuffle the samples and labels
        from sklearn.utils import shuffle
        firstLm, counterLm, labels = shuffle(firstLm, counterLm, labels)
            
#         Why landmarks are not normalized
        if forTraining:
            firstLm = np.reshape(firstLm,(-1,42))
            counterLm = np.reshape(counterLm,(-1,42))
            return ([firstLm,counterLm],labels)
        else:
            EfirstLm = np.expand_dims(firstLm,axis=1)
            EcounterLm = np.expand_dims(counterLm,axis=1)
            res = np.append(EfirstLm,EcounterLm,axis=1)
            return res, labels
        
    def makeOneShotTask(self,N,gesInd=None):
        if gesInd is None:
            gesInd = np.random.randint(0,self.uniqueGes)
        
        gesIndList = np.arange(self.uniqueGes)
        gesIndList = np.delete(gesIndList,gesInd)
        
        subjectGes = self.rawLm[gesInd]
        compGes = [subjectGes]
        
        for i in range(N-1):
            tempInd = np.random.choice(gesIndList)
            compGes.append(self.rawLm[tempInd])
        
        rad = np.random.randint(-10,10,N)*self.oneRad
        radS = np.random.randint(-10,10)*self.oneRad
        
        subjectGes = rotateAll(subjectGes,radS)
        compGes = rotateAll(compGes,rad)
        
        subjectGes = addNoiseSingle(subjectGes,2)
        compGes = addNoiseSingle(compGes,2)
        
        labels = np.zeros(N,dtype=np.int16)
        labels[0] = 1
        from sklearn.utils import shuffle
        compGes, labels = shuffle(compGes, labels)
        subjectGes = np.reshape(subjectGes,(-1,42))
        compGes = np.reshape(compGes,(-1,42))
        compGes = np.expand_dims(compGes, axis=1)
        return subjectGes, compGes, labels
    
    def evaluateModel(self,model,N,iterations=100,gesInd=None):
        i = 0
        for _ in range(iterations):
            sGes,mGes,labels = self.makeOneShotTask(N,gesInd)
            res = []
            for lm in mGes:
                inp = [sGes,lm]
                res.append(model.predict(inp))

            if np.argmax(res) == np.argmax(labels):
                i += 1
        print(i/iterations)
        
    def predict(self,model,landmark):
        res = []
        if landmark.shape == (42,):
            landmark = np.expand_dims(landmark,axis=0)
        for lm in self.lmForPrediction:
            res.append(model.predict([lm,landmark]))
        return np.argmax(res)