In [None]:
from fonduer.supervision import Labeler
from fonduer.supervision.models import GoldLabel
from fonduer.features import Featurizer
from fonduer.candidates.models import Candidate
from fonduer.parser.models import Document

from snorkel.labeling import LFAnalysis
from snorkel.labeling.model import LabelModel

from MeMoKBC.pipeline.utils import get_session, load_candidates, match_label_matrix
from MeMoKBC.definitions.candidates import NameFullAbbr, NameAbbrTask
from MeMoKBC.pipeline.lfs.name_short_long_lfs import short_long_lfs
from MeMoKBC.pipeline.lfs.name_short_task_lfs import name_abbr_task_lfs
from MeMoKBC.gold_label_matcher import match_gold_label

In [None]:
session = get_session("")

In [None]:
candidates = [NameFullAbbr, NameAbbrTask]

In [None]:
# Load goldlabels from json file and compare to candidates in database
gold_labels = match_gold_label(
    "pipeline6",
    "/data/Goldlabel_biomedRxiv/goldlabel1_docs801-840_laura/goldlabel_authorlong_short_task_medRxiv.json",
    [NameAbbrTask, NameFullAbbr]
)

# filter potential goldlabels after candidate class
nat_cands = []
nfa_cands = []
for cand in gold_labels:
    if type(cand) == NameAbbrTask:
        # remove candidates where short and long name are not in the same sentence
        if cand[0].context.sentence.id == cand[1].context.sentence.id:
            # append the id of the candidate to the list
            nat_cands.append(cand.id)
    elif type(cand) == NameFullAbbr:
        # append the id of the candidate to the list
        nfa_cands.append(cand.id)

Now that know which Candidates are our GoldLabels we still need to inform Fonduer of this. To be able to write this information into the db Fonduer needs a funktion that labels these candidates as Goldlabels.

In [None]:
# create labeler object
labeler = Labeler(session, candidates)

# write function that returns gold label for a candidate
def gold(c: Candidate) -> int:

    if type(c) == NameAbbrTask:

        # check if the candidate id is inside the list of goldlabel candidate id's
        if c.id in nat_cands:
            return 1

    elif type(c) == NameFullAbbr:
        
        # check if the candidate id is inside the list of goldlabel candidate id's
        if c.id in nfa_cands:
            return 1

    # if the candidate id is not inside the list of goldlabel candidate id's return FALSE
    return 0

# Apply the gold label function for each candidate class
labeler.apply(lfs=[[gold], [gold]], table=GoldLabel, train=True)

To be able to train our Model we need to tell it a set of candidates and their Goldlabels. This input is in the form of two list where each entry represents a candidate. For each candidate their are multiple entries that represent the outcome of the different Labeling functions. The same applies to the GoldLabels, we need a complete list of all candidates where each entry represents the output of the gold function.

In [None]:
# load candidates
train_cands = load_candidates(session, 0, candidates)

# match the candidates with the outcome of the labeling functions to generate input for the label model
L_train_NFA, L_train_NAT = match_label_matrix(session, candidates, 0) 

 # load gold labels list
L_gold_train_NFA, L_gold_train_NAT = labeler.get_gold_labels(train_cands)

Create the Label Models

In [None]:
from pathlib import Path

gen_model_NFA = LabelModel(cardinality=2)
gen_model_NAT = LabelModel(cardinality=2)

Because the output of the gold labels is 2 dimensional e.g [[0], [1], [0]] we still need to reshape the goldlabels with .reshape(-1) to reduce 1 dimension which will result in -> [0, 1, 0]

In [None]:
gen_model_NFA.fit(L_train=L_train_NFA, Y_dev=L_gold_train_NFA.reshape(-1), n_epochs=500, log_freq=100)
gen_model_NFA.save(destination="models/label_model_NFA.pkl")

In [None]:
gen_model_NAT.fit(L_train=L_train_NAT, Y_dev=L_gold_train_NAT.reshape(-1), n_epochs=500, log_freq=100)
gen_model_NAT.save(destination="models/label_model_NAT.pkl")

### Generating train marginals

In [None]:
train_marginals_NFA = gen_model_NFA.predict_proba(L_train_NFA)

In [None]:
train_marginals_NAT = gen_model_NAT.predict_proba(L_train_NAT)

In [None]:
import numpy as np

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots(1,2)
fig.set_figheight(4)
fig.set_figwidth(12)
fig.set_tight_layout("w_pad")

bins=20

ax[0].hist(np.max(train_marginals_NFA, axis=1), bins=bins)
ax[0].set_title("NFA(TRUE)")

ax[1].hist(np.max(train_marginals_NAT, axis=1), bins=bins)
ax[1].set_title("NAT(TRUE)")
plt.show()

### Iterate on LFs

In [None]:
labeler = Labeler(session, candidates)

In [None]:
dev_cands = load_candidates(session, split=1, candidate_list=candidates)

L_dev_NFA, L_dev_NAT = match_label_matrix(session, candidates, 1)

## Discriminative Model

In [None]:
import numpy as np
# calculate the diff between true and false prediction probability of each candidate
# the bigger the difference the more certain the model is
# Example True = 0.4 False = 0.6
# diff = 0.6 - 0.4 = 0.2 --> model is very unsure 
diffs_NAT = train_marginals_NAT.max(axis=1) - train_marginals_NAT.min(axis=1)
diffs_NFA = train_marginals_NFA.max(axis=1) - train_marginals_NFA.min(axis=1)

# filter out all candidates where labelmodel is very unsure
# unsure is a diff of smaller then 0.000001
train_idxs_NAT = np.where(diffs_NAT > 0.2)[0].astype(np.int64)
train_idxs_NFA = np.where(diffs_NFA > 0.2)[0].astype(np.int64)

filtered_NAT = train_marginals_NAT[train_idxs_NAT, 1]
filtered_NFA = train_marginals_NFA[train_idxs_NFA, 1]

# Cast continous values to binary for logistic regression model
y_NAT = np.where(filtered_NAT > 0.5, 1, 0)
y_NFA = np.where(filtered_NFA > 0.5, 1, 0)

Get feature matrix and filter with previous filter

In [None]:
featurizer = Featurizer(session, candidates)

In [None]:
F_train_NFA, F_train_NAT = featurizer.get_feature_matrices(train_cands)
X_NFA = F_train_NFA[train_idxs_NFA, :]
X_NAT = F_train_NAT[train_idxs_NAT, :]


train logistic regression model

In [None]:
from sklearn.linear_model import LogisticRegression, BayesianRidge

clf = LogisticRegression(max_iter=200).fit(X, y)

In [None]:
test_cands = load_candidates(session, 2, candidates)
F_test_NFT = featurizer.get_feature_matrices(test_cands)[0]

preds = clf.predict(F_test_NFT)

In [None]:
reg = BayesianRidge().fit(X.toarray(), y)