In [7]:
import json
import os
import os.path as path
from torch.utils.data import Dataset
import random

import numpy as np
from skimage import io, transform
from torchvision import transforms as torch_transform
import torch
from torch.autograd import Variable
import torch.nn.functional as F

from PIL import Image

DRESS_FOLDER = "../data/json/"

ID_NAME = "retrieval_dresses.json"
TEST_NAME = "test_pairs_dresses.json"
TRAIN_NAME = "train_pairs_dresses.json"
PHOTO_FILE = "../data/photos/photos_dress.txt"
SEED = 17

torch.manual_seed(SEED)
np.random.seed(SEED)
random.seed(SEED)
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

<h1>Step 1 upload dataset metadata and photos info</h1>

In [6]:
!git clone https://github.com/pumpikano/street2shop
!mkdir street2shop/images
!bash street2shop/get_street2shop.sh
!mkdir data
!mv street2shop/meta data

Cloning into 'street2shop'...
remote: Counting objects: 9, done.[K
remote: Compressing objects: 100% (7/7), done.[K
Unpacking objects: 100% (9/9), done.
remote: Total 9 (delta 1), reused 9 (delta 1), pack-reused 0[K
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:--  0:00:02 --:--:--     0^C
mv: rename street2shop/meta to data/meta: No such file or directory


 <h1>step 2 extract street photos only</h1>

In [8]:
import json

def read_json(file_name):
    with open(file_name) as f:
        obj = json.loads(f.readline())
    return obj


def get_photo_ids(obj, retr_dict):
    photos = set()
    for item in obj:
        photos.add(item["photo"])
        product_photos = retr_dict[item["product"]]
        photos.update(product_photos)
    return photos




In [9]:
if __name__ != "__main__":
    id_to_photo_dirty = read_json(DRESS_FOLDER+ID_NAME)
    id_to_photo_clean = {}
    for item in id_to_photo_dirty:
        if item["product"] in id_to_photo_clean:
            id_to_photo_clean[item["product"]].append(item["photo"])
        else:
            id_to_photo_clean[item["product"]] = [item["photo"]]
    test_dress = read_json(DRESS_FOLDER + TEST_NAME)
    train_dress = read_json(DRESS_FOLDER + TRAIN_NAME)
    test_set = get_photo_ids(test_dress, id_to_photo_clean)
    train_set = get_photo_ids(train_dress, id_to_photo_clean)
    result_set = test_set|train_set

    with open(PHOTO_FILE, "w") as new_ph_file:
        with open("../data/photos/photos.txt") as ph_file:
            for line in ph_file:
                id = int(line.split(",")[0])
                if id in result_set:
                    new_ph_file.write(line)

In [10]:
!python download.py --urls data/photos/photos_dress.txt

SyntaxError: invalid syntax (<ipython-input-10-29ab3a227978>, line 1)

In [11]:
import os
import os.path as path

PHOTO_FOLDER = "../data/images"

ALL_IMAGES  = [int(name.split(".")[0]) for name in os.listdir(PHOTO_FOLDER) if path.isfile(PHOTO_FOLDER+"/"+name)]



form target dataset

In [12]:
PHOTO_FOLDER = "../data/images"
META_FOLDER = "../data/meta"
from torch.utils.data import Dataset
import random
from skimage import io, transform

IMG_SIZE =299

def default_image_loader(path, crop=None,size=None):
    if size is None:
        size = (IMG_SIZE, IMG_SIZE)
    image=Image.open(path).convert('RGB')
    if crop is not None:
        image = image.crop(crop)
    image=image.resize(size)
    return image

def get_negative(positive, items):
    while True:
        rand_item = random.choice(items)
        if positive != rand_item:
            return rand_item
    

def get_image(photo_id, folder=PHOTO_FOLDER, crop=None,transform=None):
    photo_name = str(photo_id)
    zeros = "0"*(9- len(photo_name))
    extension = ".jpeg", ".png", ".gif"
    full_photo_name = None
    for ext in extension:
        full_photo_name = folder+"/"+zeros+photo_name+ext
        if path.isfile(full_photo_name):
            break
    else:
        raise Exception("No Image found")
    image = default_image_loader(full_photo_name, crop=crop)
    if transform is None:
        return image
    else:
        return transform(image)

class DressDataset(Dataset):
    """street2shop dress dataset."""

    def __init__(self, triplets_file_name, photos_folder_name=PHOTO_FOLDER, 
                 retrieval_file_name=DRESS_FOLDER+ID_NAME, transform = None):
        self.triplets = read_json(triplets_file_name)
        self.retrieval = read_json(retrieval_file_name)
        self.retrieval ={it["product"]:it["photo"] for it in self.retrieval}
        all_photos = [item["photo"] for item in self.triplets]
        for item in self.triplets:
            positive_photo = self.retrieval[item["product"]]
            anchor_photo = item["photo"]
            if positive_photo not in ALL_IMAGES or anchor_photo not in ALL_IMAGES:
                continue
            item["positive"] = positive_photo
            item["anchor"] = anchor_photo
            del item["product"]
            del item["photo"]
            negative = get_negative(item["positive"], ALL_IMAGES)
            item["negative"] = negative
        
        for item in self.triplets[:]:
            if "positive" not in item:
                self.triplets.remove(item)
            
        self.photos_folder = photos_folder_name
        self.transform = transform

    def __len__(self):
        return len(self.triplets)

    def __getitem__(self, index):
        sample = {}
        triplet = self.triplets[index]
        sample["positive"] = get_image(triplet["positive"], transform = self.transform)
        sample["negative"] = get_image(triplet["negative"], transform = self.transform)
        bbox = triplet["bbox"]
        anchor = get_image(triplet["anchor"], crop=(bbox["left"],bbox["top"],bbox["left"]+bbox["width"],bbox["top"]+bbox["height"]),
                          transform=self.transform)
        sample["anchor"] = anchor
        return (sample["positive"], sample["anchor"], sample["negative"])


In [13]:
train = DressDataset(triplets_file_name=DRESS_FOLDER+TRAIN_NAME,
                     transform=torch_transform.Compose([
                                               torch_transform.ToTensor()
                                           ]))
test = DressDataset(triplets_file_name=DRESS_FOLDER+TRAIN_NAME,
                   transform=torch_transform.Compose([
                                               torch_transform.ToTensor()
                                           ]))


In [14]:
import torch

BATCH_SIZE = 10

cuda = torch.cuda.is_available()
kwargs = {'num_workers': 1, 'pin_memory': True} if cuda else {}
train_dataset_loader = torch.utils.data.DataLoader(train,batch_size=BATCH_SIZE, shuffle=True,**kwargs)
test_dataset_loader = torch.utils.data.DataLoader(test,batch_size=BATCH_SIZE, shuffle=True,**kwargs)

<h1>define the model</h1>

In [39]:
import torch.nn as nn
import torchvision.models as models


class BasisNet(nn.Module):
    def __init__(self, embedding_size=128):
        super(BasisNet, self).__init__()
        self.inception = models.inception_v3(pretrained=True)
        for param in self.inception.parameters():
            param.requires_grad=False
        num_ftrs = self.inception.fc.in_features
        self.inception.fc = nn.Linear(num_ftrs, 512)
        self.elu = nn.ELU()
        self.final = nn.Linear(512, embedding_size)
    
    def forward(self,x):
        x = self.inception(x)[0]
        x = self.elu(x)
        x = self.final(x)
        return self.l2_norm(x)
    
    def l2_norm(self,inp):
        input_size = inp.size()
        buffer = torch.pow(inp, 2)

        normp = torch.sum(buffer, 1).add_(1e-10)
        norm = torch.sqrt(normp)

        _output = torch.div(inp, norm.view(-1, 1).expand_as(inp))

        output = _output.view(input_size)

        return output
        
class TripletNet(nn.Module):
    def __init__(self, embedding_net):
        super(TripletNet, self).__init__()
        self.embedding_net = embedding_net

    def forward(self, positive, anchor, negative):
        output1 = self.embedding_net(positive)
        output2 = self.embedding_net(anchor)
        output3 = self.embedding_net(negative)
        return output1, output2, output3

    def get_embedding(self, x):
        return self.embedding_net(x)

In [40]:
class TripletLoss(nn.Module):
    """
    Triplet loss
    Takes embeddings of an anchor sample, a positive sample and a negative sample
    """

    def __init__(self, margin):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative, size_average=True):
        distance_positive = (anchor - positive).pow(2).sum(1)  # .pow(.5)
        distance_negative = (anchor - negative).pow(2).sum(1)  # .pow(.5)
        losses = F.relu(distance_positive - distance_negative + self.margin)
        return losses.mean() if size_average else losses.sum()
    
    
dress_net = TripletNet(BasisNet())

In [None]:
import torch.optim as optim

optimizer = optim.SGD(filter(lambda p: p.requires_grad,dress_net.parameters()), lr=0.001, momentum=0.9)
criterion = TripletLoss(0.1) 
EPOCH = 2
for i in range(EPOCH):
    losses = []
    for batch_i, (positive, anchor, negative) in enumerate(train_dataset_loader):
        positive, anchor, negative = Variable(positive), Variable(anchor), Variable(negative)
        p,a,n = dress_net(anchor, positive, negative)
        loss = criterion(p,a,n)
        losses.append(loss.data[0])
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        print batch_i
    print np.mean(losses)

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
