In [1]:
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
import numpy as np
import pandas as pd
import skimage
import os
import pickle
from scipy.spatial.transform import Rotation
import PIL

nAnchors = 16
batchSize = 24
numEpochs = 3
device = "cpu"
rng = np.random.default_rng(12345)

In [2]:
def extract_feats(model, df, fileDir, size=(224,224), outfile=None, nImages=330, numBool = True):
    newDf = df.copy()
    newDf.loc[:,"Features"] = np.zeros((newDf.shape[0], 2208)).tolist()
    newDf.loc[:,"Features"] = newDf.loc[:,"Features"].astype(object)
    
    # define new index for images where first significant digit is
    # sequence number, thus it has to have more overall digits 
    # than max image number.
    if numBool:
        newDf.loc[:,"ImageNum"] = np.zeros(newDf.shape[0])
        indexer = 10**len(str(nImages))
    
    res = torchvision.transforms.Resize(size)

    for idx, row in df.iterrows():

        name = row["ImageFile"].split(".")[0]
        
        if numBool:
            seq = int(name.split("/")[0].split("q")[-1])
            frame = int(name.split("frame")[-1])

            newDf.loc[idx, "ImageNum"] = indexer*seq + frame
        im = res(torch.transpose(torch.transpose(torch.from_numpy(skimage.io.imread(os.path.join(fileDir,row["ImageFile"]))), 1,2),0,1))

        # extract features and put through GAP layer                                        
        im = im.reshape(1,3,224,224)
        x = model.features(im.float())
        newDf.at[idx,"Features"] = np.squeeze(torch.nn.AvgPool2d(7)(x).detach().numpy())
    
    if numBool:
        newDf.sort_values("ImageNum", inplace=True)

    if outfile:
        # use pickle format to make sure values of numpy array
        # are saved with full accuracy (saving as txt only yields
        # around 9 significant digits, pickle saves bit representation)
        newDf.to_pickle(outfile)

    return newDf

def define_anchors(df, nAnchors, outfile=None, randAnchors = False):
    
    newDf = df.copy()
    newDf.reset_index()
    
    newDf.loc[:,"AnchorDists"] = np.zeros((newDf.shape[0], nAnchors, 2)).tolist()
    newDf.loc[:,"AnchorDists"] = newDf.loc[:,"AnchorDists"].astype(object)

    myAnchors = np.zeros((nAnchors, 2))

    if randAnchors:
        anchIdx = rng.permutation(newDf.shape[0])[:nAnchors]
        anchDf = (newDf.iloc[anchIdx]).reset_index()
        
        for index,row in anchDf.iterrows():
            myAnchors[index,:] = np.array([row["X"], row["Y"]])

    else:
        for i, j in enumerate(np.floor(np.linspace(0,df.shape[0], nAnchors, endpoint=False)).astype(int)):
            myAnchors[i,:] = np.array([newDf.loc[j, "X"], newDf.loc[j,"Y"]])
    
    for index, row in newDf.iterrows():
        newDf.at[index, "AnchorDists"] = myAnchors - np.array([newDf.loc[index, "X"], newDf.loc[index,"Y"]])
    if outfile:
        newDf.to_pickle(outfile)

    return newDf, myAnchors

def anchors_for_testSet(df, myAnchors, outfile=None):
    # since anchors are only defined on trainings set
    newDf = df.copy()

    newDf.loc[:,"AnchorDists"] = np.zeros((newDf.shape[0], myAnchors.shape[0], 2)).tolist()
    newDf.loc[:,"AnchorDists"] = newDf.loc[:,"AnchorDists"].astype(object)

    for index, row in newDf.iterrows():
        newDf.at[index, "AnchorDists"] = myAnchors - np.array([newDf.loc[index, "X"], newDf.loc[index,"Y"]])

    if outfile:
        newDf.to_pickle(outfile)

    return newDf

def prep_kaggle_data(location, model, nAnchors, trainTestSplit = 0.9):

    kaggleAll = pd.read_csv(location +"calibration.csv")
    sc = pd.read_csv("./kaggle-data/train/scaling_factors.csv")
    sc.index = sc.loc[:,"scene"]

    translations = np.array([np.array([i.split(" ")], dtype=float) for i in kaggleAll.loc[:,"translation_vector"]])*sc.loc[location.split("/")[-2], "scaling_factor"]
    
    rots = np.array([Rotation.from_matrix(np.array([i.split(" ")], dtype=float).reshape(3,3)).as_quat() for i in kaggleAll.loc[:,"rotation_matrix"]])
    
    translations = translations.reshape(-1,3)

    kaggleAll = pd.DataFrame(data={"ImageFile": [kaggleAll.loc[i,"image_id"] + ".jpg" for i in range(kaggleAll.shape[0])],
                                "X": translations[:,0],
                                "Y": translations[:,1],
                                "Z": translations[:,2],
                                "W": rots[:,0],
                                "P": rots[:,1],
                                "Q": rots[:,2],
                                "R": rots[:,3]})

    mySplit = rng.permutation(kaggleAll.shape[0])
    lastIndex = int(np.ceil(trainTestSplit * kaggleAll.shape[0]))

    kaggleTrain = kaggleAll.iloc[mySplit[:lastIndex]]
    kaggleTest = kaggleAll.iloc[mySplit[lastIndex:]]
    
    kaggleTrain = extract_feats(model, kaggleTrain, location+"images/", numBool=False)
    kaggleTrain, anch = define_anchors(kaggleTrain, nAnchors, outfile=location+f"traindata_with_features_and_anchors{nAnchors}.pkl", randAnchors=True)

    kaggleTest = extract_feats(model, kaggleTest, location+"images/", numBool=False)             
    kaggleTest = anchors_for_testSet(kaggleTest,anch, outfile=location+f"testdata_with_features_and_anchors{nAnchors}.pkl")

def high_covis_filter(fileDir, quantile=0.1, droppedImages = []):

    """
    fileDir: str to directory containing images
    quantile: fraction of images that will be dropped
    droppedImages: images that have been dropped in previous steps
    """
    fDrops = droppedImages.copy()
    df = pd.read_csv(os.path.join(fileDir, "pair_covisibility.csv"))

    df.insert(len(df.columns), "name0", [i.split("-")[0] for i in df.loc[:,"pair"]])
    df.insert(len(df.columns), "name1", [i.split("-")[1] for i in df.loc[:,"pair"]])
    df.drop("pair",axis=1, inplace=True)

    # delete entries of dropped images
    for drop in droppedImages:
        df = df[np.logical_and(df.loc[:,"name0"] != drop, df.loc[:,"name1"] != drop)]
    
    avgCovis = pd.DataFrame(data={"image": df.loc[:,"name0"].unique(), "avgCovisibility": np.zeros(len(df.loc[:,"name0"].unique()))})
    
    for index, row in avgCovis.iterrows():
        totalCovis = df[np.logical_or(df.loc[:,"name0"] == row["image"], df.loc[:,"name1"] == row["image"])].loc[:,"covisibility"]
        avgCovis.loc[index,"avgCovisibility"] = np.mean(totalCovis)
    
    fDrops += avgCovis[avgCovis.avgCovisibility < avgCovis.avgCovisibility.quantile(quantile)].image.to_list()
    avgCovis = avgCovis[avgCovis.avgCovisibility >= avgCovis.avgCovisibility.quantile(quantile)]

    print("done")

    return avgCovis, fDrops

def randomly_alternate(dfWcovis, fileDir, highest_covis_p = 0.5, occ_size = 0.2, new_pics = 50):

    df = dfWcovis.copy()
    df = (df[df.avgCovisibility >= df.avgCovisibility.quantile(highest_covis_p)]).reset_index()


    randomTrafos = np.array([transforms.RandomErasing(occ_size),
                    transforms.RandomInvert(p=0.7), 
                    transforms.RandomGrayscale(p=0.7),
                    transforms.RandomAdjustSharpness(0.5, p = 0.7),
                    transforms.RandomAutocontrast(0.5),
                    transforms.RandomCrop(0.5),
                    ], dtype=object)

    nPics = 0

    while nPics < new_pics:
        idx = rng.integers(0, df.shape[0])
        perm = rng.permutation(len(randomTrafos))                

        im = torch.from_numpy(skimage.io.imread(os.path.join(fileDir,df.loc[idx, "image"] + ".jpg")).transpose(2,0,1))

        T = transforms.Compose([randomTrafos[perm[0]],
                            randomTrafos[perm[1]],
                            randomTrafos[perm[2]]])
    

        im = T(im)

        newName = str(df.loc[idx, "image"]) + "_aug" + str(nPics)

        im = PIL.Image.fromarray(im.numpy().transpose(1,2,0))

        im.save(os.path.join(fileDir, newName+".jpg"))
        print("created new pics")
        nPics += 1    


In [3]:
myDf, dropped = high_covis_filter("./kaggle-data/train/sacre_coeur/", 0.1)
randomly_alternate(myDf, "./kaggle-data/train/sacre_coeur/images", new_pics=5)

done
torch.Size([3, 772, 1036])
created new pics
torch.Size([3, 1086, 791])
created new pics
torch.Size([3, 690, 1059])
created new pics
torch.Size([3, 776, 1046])
created new pics
torch.Size([3, 608, 822])
created new pics


In [None]:
myModel = torchvision.models.densenet161(weights=torchvision.models.DenseNet161_Weights.IMAGENET1K_V1).float()
trainSets = {"./kaggle-data/train/brandenburg_gate/":30, "./kaggle-data/train/sacre_coeur/":25, "./kaggle-data/train/notre_dame_front_facade/":82}

for location in trainSets.keys():
    prep_kaggle_data(location, myModel, trainSets[location], 0.9)

In [None]:
# we use pretrained denseNet
myModel = torchvision.models.densenet161(weights=torchvision.models.DenseNet161_Weights.IMAGENET1K_V1).float()

In [None]:
trainData = pd.read_csv("./ShopFacade/dataset_train.txt", delimiter=" ", skiprows=1)
trainData = pd.DataFrame(data={"ImageFile": trainData.loc[:,"ImageFile,"],
                                  "X": trainData.Camera,
                                  "Y": trainData.Position,
                                  "Z": trainData.loc[:,"[X"],
                                  "W": trainData.Y,
                                  "P": trainData.Z,
                                  "Q": trainData.W,
                                  "R": trainData.P})

trainData = extract_feats(myModel, trainData, "./ShopFacade/", outfile="./ShopFacade/traindata_with_features.pkl")
trainData, anch = define_anchors(trainData, nAnchors, f"./ShopFacade/traindata_with_features_and_anchors{nAnchors}.pkl")
np.savetxt(f"./ShopFacade/anchors{nAnchors}.txt", anch)

In [None]:
testData = pd.read_csv("./ShopFacade/dataset_test.txt", delimiter=" ", skiprows=1)
testData = pd.DataFrame(data={"ImageFile": testData.loc[:,"ImageFile,"],
                                  "X": testData.Camera,
                                  "Y": testData.Position,
                                  "Z": testData.loc[:,"[X"],
                                  "W": testData.Y,
                                  "P": testData.Z,
                                  "Q": testData.W,
                                  "R": testData.P})

testData = extract_feats(myModel, testData, "./ShopFacade/", outfile="./ShopFacade/testdata_with_features.pkl")
testData = anchors_for_testSet(testData, anch, outfile=f"./ShopFacade/testdata_with_features_and_anchors{nAnchors}.pkl")