In [56]:
from colors import ColorsCorpusReader
from nltk.translate.bleu_score import corpus_bleu
import numpy as np
import os
from sklearn.model_selection import train_test_split
from torch_color_describer import (
    ContextualColorDescriber, create_example_dataset)
import utils
from utils import START_SYMBOL, END_SYMBOL, UNK_SYMBOL

In [57]:
utils.fix_random_seeds()

In [58]:
COLORS_SRC_FILENAME = os.path.join(
    "data", "colors", "filteredCorpus.csv")

In [59]:
dev_corpus = ColorsCorpusReader(
    COLORS_SRC_FILENAME,
    word_count=2,
    normalize_colors=True)

In [60]:
dev_examples = list(dev_corpus.read())

In [61]:
len(dev_examples)

13890

In [62]:
dev_rawcols, dev_texts = zip(*[[ex.colors, ex.contents] for ex in dev_examples])

In [63]:
dev_rawcols_train, dev_rawcols_test, dev_texts_train, dev_texts_test = \
    train_test_split(dev_rawcols, dev_texts)

##### Tokenizer

In [64]:
import string
import spacy
nlp = spacy.load('en_core_web_sm')

In [65]:
def get_single_words(corpus):
    all_words = []
    # remove the punctuation before splitting
    for s in corpus:
        for string in strip_punctuation(s.lower()).split():
            # remove suffixes from each word
            all_words.append(strip_suffix(string, ['er', 'est', 'ish']))
    
    return [w for w in all_words if all_words.count(w) <= 1]
    
def tokenize_example(s):

    s_strip = []
    # remove the punctuation before splitting
    for string in strip_punctuation(s.lower()).split():
        # remove suffixes from each word
        s_strip.append(strip_suffix(string, ['er', 'est', 'ish']))
    
    s_final = s_strip

    return [START_SYMBOL] + s_final + [END_SYMBOL]

def strip_suffix(s, suffixes):
    result = s
    for suffix in suffixes:
        result = s[:(len(s) - len(suffix))] if s.endswith(suffix) else s
        if result != s:
            break
            
    return result       

def strip_punctuation(s):
    punc = s.maketrans(dict.fromkeys(string.punctuation))
    return s.translate(punc)

In [66]:
dev_seqs_train = [tokenize_example(s) for s in dev_texts_train]

dev_seqs_test = [tokenize_example(s) for s in dev_texts_test]

In [67]:
dev_vocab = sorted({w for toks in dev_seqs_train for w in toks})

dev_vocab += [UNK_SYMBOL]

In [68]:
dev_vocab[:10]

['', '10', '2', '2nd', '6', '</s>', '<s>', 'a', 'again', 'ahaha']

In [69]:
len(dev_vocab)

987

##### Colours represnetation improvements

In [70]:
from itertools import product

In [71]:
def hsl_hsv(colour):
    """
    Converts a color from hsl format to hsv 
    https://en.wikipedia.org/wiki/HSL_and_HSV#HSV_to_HSL
    
    Parameters
    -------
    colour: a list of float
        Represent the three colours in HSL format.
    
    Returns
    -------
    typ: list of float
        The transformed to HSV format clour values.
     
    """
    H, S, L = colour
    H_v = H
    V = L + S * min(L, 1 - L)
    S_v = 0 if V == 0 else 2 * (1 - L/V)

    return [H_v, V, S_v]

In [72]:
hsv_t = hsl_hsv(dev_rawcols_train[0][0])
hsv_t

[0.19444444444444445, 0.165, 0.6666666666666667]

In [73]:
import cmath as cm

In [74]:
# nice source code https://github.com/futurulus/coop-nets/blob/01b1710b71358b224494d3329cc31b3cff9e10f6/vectorizers.py#L599
def represent_color_context(colors):

    result = []
    for hslcolour in colors:
        f_jkl = []
        for j, k, l in product((0, 1, 2), repeat=3):    
            h, s, v = hsl_hsv(hslcolour)
            f_jkl.append(
                cm.exp(-2j * cm.pi * (j*h/360 + k*s/200 + l*v/200))
            )

        real = []
        imag = []
        for f in f_jkl:
            real.append(f.real)
            imag.append(f.imag)
        result.append(np.transpose(real + imag))

    return result

In [75]:
represent_color_context(dev_rawcols_train[21])

[array([ 1.        ,  0.99978068,  0.99912283,  0.99982235,  0.99920834,
         0.99815605,  0.99928947,  0.99828099,  0.99683462,  0.99998654,
         0.99965859,  0.99889216,  0.99971112,  0.99898852,  0.99782774,
         0.99908051,  0.99796352,  0.99640879,  0.99994618,  0.9995096 ,
         0.9986346 ,  0.99957299,  0.99874182,  0.99747257,  0.99884466,
         0.9976192 ,  0.99595615,  0.        , -0.02094242, -0.04187565,
        -0.01884844, -0.03978301, -0.06070012, -0.03769018, -0.05860946,
        -0.07950302, -0.00518748, -0.02612848, -0.04705802, -0.02403475,
        -0.04496585, -0.06587722, -0.04287347, -0.06378723, -0.08467301,
        -0.01037483, -0.03131384, -0.05223913, -0.02922041, -0.05014748,
        -0.07105255, -0.04805561, -0.06896329, -0.08984073]),
 array([ 1.        ,  0.99978068,  0.99912283,  0.99973342,  0.99903063,
         0.99788963,  0.99893383,  0.99774794,  0.9961244 ,  0.99999921,
         0.99975349,  0.99906925,  0.99970352,  0.99897435,  0

In [76]:
dev_cols_train = [represent_color_context(colors) for colors in dev_rawcols_train]

dev_cols_test = [represent_color_context(colors) for colors in dev_rawcols_test]

##### Add GloVe embeddings

In [25]:
GLOVE_HOME = os.path.join('data', 'glove.6B')

In [26]:
def create_glove_embedding(vocab, glove_base_filename='glove.6B.50d.txt'):
    pass

    glove = utils.glove2dict(
        os.path.join(GLOVE_HOME, glove_base_filename)
    )

    embedding, vocab_ex = \
        utils.create_pretrained_embedding(glove, vocab)

    return embedding, vocab

In [27]:
dev_glove_embedding, dev_glove_vocab = create_glove_embedding(dev_vocab)

##### Develop the colour context

In [28]:
from torch_color_describer import Decoder, Encoder
import torch
import torch.nn as nn

In [None]:
class ColorContextLSTMEncoder(Encoder):
    def __init__(self, color_dim, *args, **kwargs):
        super().__init__(color_dim, *args, **kwargs)
        
        self.rnn = nn.LSTM(
            input_size=self.color_dim,
            hidden_size=self.hidden_dim,
            batch_first=True)

In [None]:
class ColorContextLSTMDecoder(Decoder):
    def __init__(self, color_dim, *args, **kwargs):
        self.color_dim = color_dim
        super().__init__(*args, **kwargs)
        
        self.rnn = nn.LSTM(
            input_size=self.embed_dim,
            hidden_size=self.hidden_dim,
            batch_first=True)


    def get_embeddings_ext(self, word_seqs, target_colors=None):
        """
        You can assume that `target_colors` is a tensor of shape
        (m, n), where m is the length of the batch (same as
        `word_seqs.shape[0]`) and n is the dimensionality of the
        color representations the model is using. The goal is
        to attach each color vector i to each of the tokens in
        the ith sequence of (the embedded version of) `word_seqs`.

        """
        ##### YOUR CODE HERE
        embedding_word_seqs = self.embedding(word_seqs)
        elen = embedding_word_seqs.shape[1]
        tc_extended = target_colors
        if len(target_colors.shape) < len(embedding_word_seqs.shape):
            tc_extended = torch.unsqueeze(target_colors, 1)
        repeat_vec = torch.LongTensor([elen/tc_extended.shape[1]])
        repeated_tc = torch.repeat_interleave(tc_extended, repeat_vec, dim=1)

        return torch.cat((embedding_word_seqs, repeated_tc), 2)

In [None]:
class ColorizedInputLSTMDescriber(ContextualColorDescriber):

    def build_graph(self):

        encoder = ColorContextLSTMEncoder(
            color_dim=self.color_dim,
            hidden_dim=self.hidden_dim)

        decoder = ColorContextLSTMDecoder(
            color_dim=self.color_dim,
            vocab_size=self.vocab_size,
            embedding=self.embedding,
            embed_dim=self.embed_dim,
            hidden_dim=self.hidden_dim)

        return ColorizedEncoderDecoder(
            encoder=encoder,
            decoder=decoder)

In [29]:
class ColorContextEncoder(Encoder):
    def __init__(self, color_dim, num_layers=2, *args, **kwargs):
        super().__init__(color_dim, *args, **kwargs)
        
        self.num_layers = num_layers
        self.rnn = nn.GRU(
            input_size=self.color_dim,
            hidden_size=self.hidden_dim,
            num_layers=self.num_layers,
            batch_first=True)
        

In [30]:
class ColorContextDecoder(Decoder):
    def __init__(self, color_dim, num_layers=2, *args, **kwargs):
        self.color_dim = color_dim
        super().__init__(*args, **kwargs)
        
        self.num_layers = num_layers
        self.rnn = nn.GRU(
            input_size=self.color_dim + self.embed_dim,
            hidden_size=self.hidden_dim,
            num_layers=self.num_layers,            
            batch_first=True)


    def get_embeddings(self, word_seqs, target_colors=None):
        """
        You can assume that `target_colors` is a tensor of shape
        (m, n), where m is the length of the batch (same as
        `word_seqs.shape[0]`) and n is the dimensionality of the
        color representations the model is using. The goal is
        to attach each color vector i to each of the tokens in
        the ith sequence of (the embedded version of) `word_seqs`.

        """
        ##### YOUR CODE HERE
        embedding_word_seqs = self.embedding(word_seqs)
        elen = embedding_word_seqs.shape[1]
        tc_extended = target_colors
        if len(target_colors.shape) < len(embedding_word_seqs.shape):
            tc_extended = torch.unsqueeze(target_colors, 1)
        repeat_vec = torch.LongTensor([elen/tc_extended.shape[1]])
        repeated_tc = torch.repeat_interleave(tc_extended, repeat_vec, dim=1)
        
        return torch.cat((embedding_word_seqs, repeated_tc), 2)

In [31]:
#def test_get_embeddings(decoder_class):
#     """
#     It's assumed that the input to this will be `ColorContextDecoder`.
#     You pass in the class, and the function initalizes it with the test
#     parameters.
#     """
# dec = decoder_class(
#     color_dim=3,   # For these, we mainly want *different*
#     vocab_size=10, # dimensions so that we reliably get
#     embed_dim=4,   # dimensionality errors if something
#     hidden_dim=5)  # isn't working.

# This step just changes the embedding to one with values
# that are easy to inspect and definitely will not change
# between runs:
embedding = nn.Embedding.from_pretrained(
    torch.FloatTensor([
        [10, 11, 12, 13],
        [14, 15, 16, 17],
        [18, 19, 20, 21]]))

# These are the incoming sequences -- lists of indices
# into the rows of `dec.embedding`:
word_seqs = torch.tensor([
    [0,1,2],
    [2,0,1]])

# Target colors as small floats that will be easy to track:
target_colors = torch.tensor([
    [0.1, 0.2, 0.3],
    [0.7, 0.8, 0.9]])

# The desired return value: one list of tensors for each of
# the two sequences in `word_seqs`. Each index is replaced
# with its vector from `dec.embedding` and has the
# corrresponding color from `target_colors` appended to it.
expected = torch.tensor([
    [[10., 11., 12., 13.,  0.1,  0.2,  0.3],
     [14., 15., 16., 17.,  0.1,  0.2,  0.3],
     [18., 19., 20., 21.,  0.1,  0.2,  0.3]],

    [[18., 19., 20., 21.,  0.7,  0.8,  0.9],
     [10., 11., 12., 13.,  0.7,  0.8,  0.9],
     [14., 15., 16., 17.,  0.7,  0.8,  0.9]]])

# result = dec.get_embeddings(word_seqs, target_colors=target_colors)

# assert expected.shape == result.shape, \
#     "Expected shape {}; got shape {}".format(expected.shape, result.shape)

# assert torch.all(expected.eq(result)), \
#     ("Your result has the desired shape but the values aren't correct. "
#      "Here's what your function creates; compare it with `expected` "
#      "from the test:\n{}".format(result))

In [32]:
embedding_word_seqs = embedding(word_seqs)
elen = embedding_word_seqs.shape[1]
tc_extended = target_colors
if len(target_colors.shape) < len(embedding_word_seqs.shape):
    tc_extended = torch.unsqueeze(target_colors, 1)
repeat_vec = torch.LongTensor([elen/tc_extended.shape[1]])
repeated_tc = torch.repeat_interleave(tc_extended, repeat_vec, dim=1)
result = torch.cat((embedding_word_seqs, repeated_tc), 2)

In [33]:
result

tensor([[[10.0000, 11.0000, 12.0000, 13.0000,  0.1000,  0.2000,  0.3000],
         [14.0000, 15.0000, 16.0000, 17.0000,  0.1000,  0.2000,  0.3000],
         [18.0000, 19.0000, 20.0000, 21.0000,  0.1000,  0.2000,  0.3000]],

        [[18.0000, 19.0000, 20.0000, 21.0000,  0.7000,  0.8000,  0.9000],
         [10.0000, 11.0000, 12.0000, 13.0000,  0.7000,  0.8000,  0.9000],
         [14.0000, 15.0000, 16.0000, 17.0000,  0.7000,  0.8000,  0.9000]]])

In [34]:
expected.shape

torch.Size([2, 3, 7])

In [36]:
from torch_color_describer import EncoderDecoder

In [37]:
class ColorizedEncoderDecoder(EncoderDecoder):

    def forward(self,
            color_seqs,
            word_seqs,
            seq_lengths=None,
            hidden=None,
            targets=None):
        
        if hidden is None:
            hidden = self.encoder(color_seqs)

        # Extract the target colors from `color_seqs` and
        # feed them to the decoder, which already has a
        # `target_colors` keyword.

        result = []
        for cs in color_seqs:
            result.append(np.array(cs[2]))
        t_colours = torch.tensor(result)

        output, hidden =  self.decoder(
            target_colors=t_colours,
            word_seqs=word_seqs,
            seq_lengths=seq_lengths,
            hidden=hidden)

        if self.training:
            return output
        else:
            return output, hidden

In [38]:
from torch_color_describer import Encoder

In [39]:
class ColorizedInputDescriber(ContextualColorDescriber):
    def __init__(self, *args, num_layers=2, **kwargs):
        self.num_layers = num_layers
        super().__init__(*args, **kwargs)
    
    def build_graph(self):

        encoder = ColorContextEncoder(
            color_dim=self.color_dim,
            hidden_dim=self.hidden_dim,
            num_layers=self.num_layers)

        decoder = ColorContextDecoder(
            color_dim=self.color_dim,
            vocab_size=self.vocab_size,
            embedding=self.embedding,
            embed_dim=self.embed_dim,
            hidden_dim=self.hidden_dim, 
            num_layers=self.num_layers)

        return ColorizedEncoderDecoder(
            encoder=encoder,
            decoder=decoder)

In [40]:
def test_full_system(describer_class):
    toy_color_seqs, toy_word_seqs, toy_vocab = create_example_dataset(
        group_size=50, vec_dim=2)

    toy_color_seqs_train, toy_color_seqs_test, toy_word_seqs_train, toy_word_seqs_test = \
        train_test_split(toy_color_seqs, toy_word_seqs)

    toy_mod = describer_class(toy_vocab)

    _ = toy_mod.fit(toy_color_seqs_train, toy_word_seqs_train)

    return toy_mod.listener_accuracy(toy_color_seqs_test, toy_word_seqs_test)

In [41]:
test_full_system(ColorizedInputDescriber)

Finished epoch 1000 of 1000; error is 0.11071182787418365

1.0

##### Test with different configurations

In [42]:
model = ColorizedInputDescriber(
    dev_glove_vocab,
    embedding=dev_glove_embedding,
    early_stopping=True,
    num_layers=3,
    eta=0.001)

In [43]:
model.fit(dev_cols_train, dev_seqs_train)

Stopping after epoch 19. Validation score did not improve by tol=1e-05 for more than 10 epochs. Final error is 39.94683003425598

ColorizedInputDescriber(
	batch_size=1028,
	max_iter=1000,
	eta=0.001,
	optimizer_class=<class 'torch.optim.adam.Adam'>,
	l2_strength=0,
	gradient_accumulation_steps=1,
	max_grad_norm=None,
	validation_fraction=0.1,
	early_stopping=True,
	n_iter_no_change=10,
	warm_start=False,
	tol=1e-05,
	hidden_dim=50,
	embed_dim=50,
	embedding=[[ 0.1394268  -0.47498924 -0.22497068 ... -0.2220264   0.13568444
  -0.13516782]
 [-0.14751     0.55556     1.0764     ... -0.3635      0.12941
   0.18798   ]
 [-0.11098     0.86724     0.78114    ... -0.61752     0.59103
   0.28649   ]
 ...
 [-0.23299    -0.5428     -0.4657     ...  0.070107    0.083831
   0.46851   ]
 [-1.5754      0.45398    -0.37413    ...  0.56113     0.24468
   0.43962   ]
 [ 0.23474262 -0.14531334  0.45088101 ... -0.42164843 -0.45073382
  -0.03124779]],
	freeze_embedding=False)

##### Model evaluation

In [44]:
def prepare_and_evaluate(trained_model, color_seqs_test, texts_test):
    
    # `word_seqs_test` is a list of strings, so tokenize each of
    # its elements:
    tok_seqs = [tokenize_example(s) for s in texts_test]

    col_seqs = [represent_color_context(colors)
                for colors in color_seqs_test]

    return trained_model.evaluate(col_seqs, tok_seqs)

In [45]:
prepare_and_evaluate(model, dev_rawcols_test, dev_texts_test)

{'listener_accuracy': 0.38180247624532104, 'corpus_bleu': 0.5094102047245922}