disco  
Copyright (C) 2022-present NAVER Corp.  
Creative Commons Attribution-NonCommercial-ShareAlike 4.0 license  

# Expressing Preferences via an EBM

We first have to express our preferences on the generated sequences. We do this via an EBM obtained by constraining a base model with scoring features.
We're going to reuse the amazing example from the [README](README.md) to go in a little more depth

## Pointwise Constraint

Let's first revisit the amazing example from the [README](README.md).

With a `BooleanScorer`, it's straightforward to express that we want to have "amazing" in the sampled texts.

In [None]:
from disco.scorers import BooleanScorer

In [None]:
amazing_scorer = BooleanScorer(lambda s, c: "amazing" in s.text)

We can already (log-)score text samples, using a named tuple defined in disco to format them.

In [None]:
from disco.distributions.lm_distribution import TextSample

In [None]:
samples = [
        TextSample(list(), "This is quite amazing."),
        TextSample(list(), "This is amazingly relevant."),
        TextSample(list(), "This is the toolkit at work.")
    ]
amazing_scorer.log_score(samples, '')

1. The lists as first members of the tuples are empty as we don't need the tokenized form to score the samples with amazing_scorer.
1. We pass an empty context when (log-)scoring as it's not relevant when looking for amazing.

Yes, we've again simplied things a bit here as we're not scoring the presence of "amazing" as word but of the string. Can we do better?

In [None]:
import re

In [None]:
is_amazing = lambda s, c: bool(re.search(r"\bamazing\b", s.text))

In [None]:
amazing_scorer = BooleanScorer(is_amazing)

In [None]:
amazing_scorer.log_score(samples, '')

Once we have this scorer, we can express our preference in an EBM.

We start by instantiating a LMDistribution as the base model:

In [None]:
from disco.distributions import LMDistribution

In [None]:
base = LMDistribution()

To specify that all our generated samples should include the word "amazing", we then use a straight product to define our target EBM:

In [None]:
target = base * amazing_scorer

Note that here we've only specified our preferences, or constraints: we would then have to approximate this EBM, for example by tuning a model —although we've expressed that we want all our samples to include "amazing", we will only approach that constraint, up to about 80% from our experiments.

## Distributional Constraint

If we only want half of our samples to include amazing, we have to constrain the base model to compute the coefficient to use in the resulting EBM:

In [None]:
target = base.constrain([amazing_scorer], [1/2])

This works we've hidden something with this default syntax: the coefficients are computed for an empty context which might not be what we want to do.

To specify a fixed context, other than the empty string we have to use a `SingleContextDistribution`:

In [None]:
from disco.distributions.single_context_distribution import SingleContextDistribution

In [None]:
incipit = "It was a cold and stormy night"

In [None]:
target = base.constrain([amazing_scorer], [1/2],
        n_samples=2**10,
        context_distribution=SingleContextDistribution(incipit))

And if we want to compute those coefficients for variable contexts, we can use a ContextDistribution to specify a text file listing them, one per line.

In [None]:
from disco.distributions.context_distribution import ContextDistribution

In [None]:
target = base.constrain([amazing_scorer], [1/2],
        n_samples=2**9,
        context_distribution=ContextDistribution("data/incipits.txt"), context_sampling_size=2**3)

If we peek inside the EBM we can check the coefficient that's been computed.

In [None]:
target.scorers[1].coefficients

With this knowledge, we could define our EBM directly.

In [None]:
from disco.scorers.exponential_scorer import ExponentialScorer

In [None]:
target = base * ExponentialScorer([is_amazing], [6])

## Features and Scorers

### Multiple Scorers

Let's go beyond the very simple example of looking for "amazing" in our samples.  
A first thing we can do is have multiple such features.

In [None]:
is_rainy = lambda s, c: bool(re.search(r"\brain\b", s.text))
rainy_scorer = BooleanScorer(is_rainy)

With this second scorer, we can constrain our base model, wishing for 50% of the samples with the word "amazing" and 33% with the "rain".

In [None]:
target = base.constrain([amazing_scorer, rainy_scorer], [1/2, 1/3],
        n_samples=2**10,
        context_distribution=SingleContextDistribution(incipit))

### Function-based Scorer

<u>Readability</u>

Let's now try something a bit ambitious than just looking for the presence of "amazing", be it the string or the word. What about readability? If we can score our samples using for example a FOG index we might use that as a feature, a preference expressed in an EBM.

There are useful packages available to compute readability index but we can try to define our own functions: the [FOG](https://en.wikipedia.org/wiki/Gunning_fog_index) is a classic measure.

In [None]:
def extract_words(text):
    return re.findall(r'\w+', text)

def count_sentences(text):
    if "" == text:
        return 0
    marks = set(".!?")
    rw_lngth = len([l for l in text if l in marks])
    return rw_lngth if 0 < rw_lngth else 1

def count_syllables(word):
    vowels = set("aeiou")
    return len([l for l in word if l.lower() in vowels])

In [None]:
def fog(text):
    l, h = 6, 17
    if "" == text:
        return l
    wrds = extract_words(text)
    n_wrds = len(wrds)
    n_cmplx_wrds = len([w for w in wrds if 3 < count_syllables(w)])
    n_sntncs = count_sentences(text)
    rw_scr = round(0.4 * (n_wrds / n_sntncs + 100 * n_cmplx_wrds / n_wrds))
    return min(h, max(l, rw_scr))

In [None]:
star_wars = """It is a period of civil war. Rebel spaceships,
striking from a hidden base, have won their first victory against
the evil Galactic Empire. During the battle, Rebel spies managed
to steal secret plans to the Empire’s ultimate weapon, the DEATH STAR,
an armoured space station with enough power to destroy an entire planet."""

In [None]:
fog(star_wars)

One way to use this `fog()` is to pass is to a BooleanScorer, in a scoring predicate.

In [None]:
target = base * BooleanScorer(lambda s, c: True if 13 > fog(s.text) else False)

Or, to make things a bit clearer, if a bit more verbose:

In [None]:
def easy(s, _):
    """a FOG index lower than 13 means that the text
    should be readable without college education"""
    return True if 13 > fog(s.text) else False

In [None]:
target = base * BooleanScorer(easy)

Let's check what that gives for a few samples from a default GPT-2, using our infamous [incipit](https://en.wikipedia.org/wiki/It_was_a_dark_and_stormy_night).

In [None]:
proposal = LMDistribution()
samples, _ = proposal.sample(context=incipit)

In [None]:
target.log_score(samples, context=incipit)

We defined pointwise constraints with our products so we expect all generated samples to be easily readable.  
Obviously we could define a distributional constraints, asking for only half of our samples to be easy for example. What would make even more sense here is state that we want our sentences to have _on average_ a FOG index corresponding to the end of high school.

To do this we're going to use a more generic `PositiveScorer` instead.

In [None]:
from disco.scorers.positive_scorer import PositiveScorer

In [None]:
target = base.constrain([PositiveScorer(lambda s, c: fog(s.text))], [12],
    context_distribution=SingleContextDistribution(incipit))

Or, again, using a file of incipits for a variable contexts:

In [None]:
target = base.constrain([PositiveScorer(lambda s, c: fog(s.text))], [12],
        n_samples=2**9,
        context_distribution=ContextDistribution("data/incipits.txt"), context_sampling_size=2**3)

Yet another way to define our feature is to subclass a `PositiveScorer` in our own `FogScorer`.

In [None]:
import torch

In [None]:
class FogScorer(PositiveScorer):
    """
    FOG scoring class
    """

    def _extract_words(self, text):
        return re.findall(r'\w+', text)

    def _count_sentences(self, text):
        if "" == text:
            return 0
        marks = set(".!?")
        rw_lngth = len([l for l in text if l in marks])
        return rw_lngth if 0 < rw_lngth else 1

    def _count_syllables(self, word):
        vowels = set("aeiou")
        return len([l for l in word if l.lower() in vowels])

    def fog(self, sample, _):
        text = sample.text
        l, h = 6, 17
        if "" == text:
            return l
        wrds = self._extract_words(text)
        n_wrds = len(wrds)
        n_cmplx_wrds = len([w for w in wrds if 3 < self._count_syllables(w)])
        n_sntncs = self._count_sentences(text)
        rw_scr = round(0.4 * (n_wrds / n_sntncs + 100 * n_cmplx_wrds / n_wrds))
        return min(h, max(l, rw_scr))

    def __init__(self):
        self.scoring_function = self._broadcast(self.fog)

    def log_score(self, samples, context):
        return torch.log(self.score(samples, context))

    def score(self, samples, context):
        return torch.tensor(self.scoring_function(samples, context))

which can be used very similarly:

In [None]:
target = base.constrain([FogScorer()], [12],
        n_samples=2**9,
        context_distribution=ContextDistribution("data/incipits.txt"), context_sampling_size=2**3)