# Args generation on tabular data
Examples from Leila's paper

In [None]:
from sklearn.preprocessing import OneHotEncoder
import numpy as np
from itertools import combinations
from tqdm import tqdm
from pprint import pprint

In [None]:
example = [[0, 0], [0, 1], [1, 0], [1, 1]]
ex_labels = [0, 0, 1, 0]

In [None]:
hiking_ex = [[0, 0, 1, 0],
             [1, 0, 0, 0],
             [0, 0, 1, 1],
             [1, 0, 0, 1],
             [0, 1, 1, 0],
             [0, 1, 1, 1],
             [1, 1, 0, 1]]
#            [1, 1, 0, 0]]
hiking_labels = [0, 1, 0, 1, 0, 0, 1]  # ,1]

In [None]:
oh_enc = OneHotEncoder(handle_unknown='ignore', sparse=True)
X = oh_enc.fit_transform(hiking_ex).todok()

features_name_hiking = oh_enc.get_feature_names_out(['V', 'C', 'M', 'E'])
t_X = X.transpose().toarray()

print(X.toarray())

[[1. 0. 1. 0. 0. 1. 1. 0.]
 [0. 1. 1. 0. 1. 0. 1. 0.]
 [1. 0. 1. 0. 0. 1. 0. 1.]
 [0. 1. 1. 0. 1. 0. 0. 1.]
 [1. 0. 0. 1. 0. 1. 1. 0.]
 [1. 0. 0. 1. 0. 1. 0. 1.]
 [0. 1. 0. 1. 1. 0. 0. 1.]]


In [None]:
instances_by_feature = {}

for i, col in enumerate(t_X):
    instances_by_feature.update({i: list(np.where(col)[0])})

print(instances_by_feature)

{0: [0, 2, 4, 5], 1: [1, 3, 6], 2: [0, 1, 2, 3], 3: [4, 5, 6], 4: [1, 3, 6], 5: [0, 2, 4, 5], 6: [0, 1, 4], 7: [2, 3, 5, 6]}


In [None]:
def generate_args_lenN(n, ibyf, dataset, predictions, minimals=None):
    """
    Generates arguments of length n, given arguments of length 1.. n-1
    :param n: length of arguments to be generated
    :param ibyf: instances_by_feature
    :param predictions:
    :param minimals: arguments (minimal)
    :return:
    """

    def is_minimal(potential_arg, cl, minimals, n):
        # cl is class
        set_potential_arg = set(potential_arg)
        for k in range(n):
            for comb_ in combinations(potential_arg, k+1):
                if frozenset(comb_) in minimals[cl][k]:
                    return False
        return True

    if minimals is None:
        minimals = ([], [])
    assert len(minimals[0]) == n-1
    minimals[0].append(set())
    minimals[1].append(set())

    args = [set(), set()]
    potential_args_checked_count = 0
    for i, row in enumerate(dataset):
        for potential_arg in combinations(np.where(row)[0], n):
            cl = predictions[i]
            potential_args_checked_count += 1
            if not is_minimal(potential_arg, cl, minimals, n-1):
                continue
            selection = set.intersection(*[set(ibyf[w]) for w in potential_arg])  # all rows with all features of potential argument
            selection_preds = [predictions[i_] for i_ in selection]
            if selection_preds[:-1] == selection_preds[1:]:
                    args[selection_preds[0]].add(frozenset(potential_arg))
                    minimals[cl][n-1].add(frozenset(potential_arg))
    print(potential_args_checked_count, ' potential arg checked.')
    return args, minimals

  

def read_args(minimals, feature_names):
    arguments = [[], []]
    for cl in range(len(minimals)):
        for a in range(len(minimals[cl])):
            for f in minimals[cl][a]:
                arguments[cl].append(tuple([feature_names[k] for k in f]))
    return arguments


In [None]:
n = 0
minimals = None
print("len ", n, ":", minimals)
while not minimals or len(minimals[0][-1]) != 0 or len(minimals[1][-1]) != 0 :
    n += 1
    args, minimals = generate_args_lenN(n, instances_by_feature, X.toarray(), hiking_labels, minimals)
    print("len ", n, ":", minimals)

len  0 : None
28  potential arg checked.
len  1 : ([{frozenset({5}), frozenset({0})}], [{frozenset({1}), frozenset({4})}])
42  potential arg checked.
len  2 : ([{frozenset({5}), frozenset({0})}, {frozenset({3, 6})}], [{frozenset({1}), frozenset({4})}, set()])
28  potential arg checked.
len  3 : ([{frozenset({5}), frozenset({0})}, {frozenset({3, 6})}, set()], [{frozenset({1}), frozenset({4})}, set(), set()])


In [None]:
pprint(read_args(minimals, features_name_hiking))

[[('M_1',), ('V_0',), ('C_1', 'E_0')], [('V_1',), ('M_0',)]]


# Args generation on Text data
### Data preparation

In [None]:
from IPython.display import clear_output

In [None]:
!python -m spacy download en_core_web_sm
!pip install anchor-exp fasttext
clear_output()

Collecting en_core_web_sm==2.2.5
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-2.2.5/en_core_web_sm-2.2.5.tar.gz (12.0 MB)
[K     |████████████████████████████████| 12.0 MB 587 kB/s 
[38;5;2m✔ Download and installation successful[0m
You can now load the model via spacy.load('en_core_web_sm')
Collecting anchor-exp
  Downloading anchor_exp-0.0.2.0.tar.gz (427 kB)
[K     |████████████████████████████████| 427 kB 4.4 MB/s 


In [None]:
import anchor
import spacy
import fasttext
import os
import sklearn.model_selection
from anchor import anchor_text
from tqdm import tqdm
import numpy as np
from pprint import pprint
import itertools
import pandas as pd
from functools import reduce
import networkx as nx
import matplotlib.pyplot as plt
from nltk.corpus import stopwords
from collections import defaultdict
from operator import itemgetter

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')
!ls "/content/gdrive/MyDrive/Colab Notebooks/datasets/rt-polaritydata/rt-polaritydata"
%cd "/content/gdrive/MyDrive/Colab Notebooks/wd/argumentation"
!pwd

In [None]:
def load_polarity(path):
    data = []
    labels = []
    f_names = ['rt-polarity.neg', 'rt-polarity.pos']
    for (l, f) in enumerate(f_names):
        for line in open(os.path.join(path, f), 'rb'):
            try:
                line.decode('utf8')
            except:
                continue
            line = str(line.strip()).lstrip("b\'").rstrip("\'")
            line = line.strip("\"")
            line = line.replace('\"', '\'')
            data.append(line)
            labels.append(l)
    return data, labels

def write_file(dataset, labels, file_name):
    flag = "__label__"
    with open(file_name, 'w') as f:
        for i in tqdm(range(len(dataset))):
            txt = str(dataset).lstrip("b\'").rstrip("\'")
            line = flag + str(labels[i]) + flag + " " + str(dataset[i])
            f.write(line + '\n')

In [None]:
data, labels = load_polarity("/content/gdrive/MyDrive/Colab Notebooks/datasets/rt-polaritydata/rt-polaritydata")
train, test, train_labels, test_labels = sklearn.model_selection.train_test_split(data, labels, test_size=.2, random_state=42)
train, val, train_labels, val_labels = sklearn.model_selection.train_test_split(train, train_labels, test_size=.1, random_state=42)

print('data sample:')
print(train[0])

re_write_files = False

if re_write_files:
    print("Writing train")
    write_file(train, train_labels, 'rt2.train')
    print("Writing dev")
    write_file(val, val_labels, 'rt2.dev')
    print("Writing test")
    write_file(test, test_labels, 'rt2.test')

re_train = False
if re_train:
    rt_model = fasttext.train_supervised(input="rt2.train")
    rt_model.save_model("rt2.model")
else:
    rt_model = fasttext.load_model("rt2.model")

# Args generation on Text data
### Tests

In [None]:
def predict_rt(sample):
    res = rt_model.predict(sample)
    preds = []
    for e in np.array(res[0]):
        if e[0] == '__label__1__label__': #POSITIVE
            preds.append(1)
        else:
            preds.append(0)
    return np.array(preds)

def exemple_explain_with_anchor():
    nlp = spacy.load("en_core_web_sm")

    explainer = anchor_text.AnchorText(nlp, ['negative', 'positive'], use_unk_distribution=False)

    text = "It is a good movie"
    #text='the latest installment in the pokemon canon' # , pokemon 4ever is surprising less moldy and trite than the last two , likely because much of the japanese anime is set in a scenic forest where pokemon graze in peace .'
    pred = explainer.class_names[predict_rt([text])[0]]
    alternative = explainer.class_names[1 - predict_rt([text])[0]]
    print('Prediction: %s' % pred)
    exp = explainer.explain_instance(text, predict_rt, threshold=0.95)

    print('Anchor: %s' % (' AND '.join(exp.names())))
    print('Precision: %.2f' % exp.precision())
    print()
    print('Examples where anchor applies and model predicts %s:' % pred)
    print()
    print('\n'.join([x[0] for x in exp.examples(only_same_prediction=True)]))
    print()
    print('Examples where anchor applies and model predicts %s:' % alternative)
    print()
    print('\n'.join([x[0] for x in exp.examples(partial_index=0, only_different_prediction=True)]))
    print(rt_model.predict("Definitely not a good movie"))


def time_test():
    import time
    text = 'the latest installment in the pokemon canon, pokemon 4ever is surprising less moldy and trite than the last two , likely because much of the japanese anime is set in a scenic forest where pokemon graze in peace .'
    tab_text = text.split()
    for i in range(1, len(tab_text)):
        start = time.time()
        text_ = " ".join(tab_text[:i])
        print(text_)
        nlp = spacy.load("en_core_web_sm")
        explainer = anchor_text.AnchorText(nlp, ['negative', 'positive'], use_unk_distribution=False)
        pred = explainer.class_names[predict_rt([text_])[0]]
        alternative = explainer.class_names[1 - predict_rt([text_])[0]]
        print('Prediction: %s' % pred)
        exp = explainer.explain_instance(text_, predict_rt, threshold=0.95)
        print('Anchor: %s' % (' AND '.join(exp.names())))
        print('Precision: %.2f' % exp.precision())
        print("time for " + str(i) + "words :", time.time() - start)


def evaluate_coherence(file=None):
    texts = []
    anchors = []
    predictions = []
    if file is not None:
        with open(file, 'r') as f:
            for line in f.readlines():
                s = line.split("\"")
                if len(s) > 3:
                    continue
                assert s[0] == 'b'
                text = s[1]
                info = s[2].split(",")
                pred = info[2]
                anchor = info[3].split("AND")
                texts.append(text)
                anchors.append(anchor)
                predictions.append(pred)
    assert len(texts) == len(anchors) and len(anchors) == len(predictions)
    print("anchors len:", len(anchors))
    incoherences = {}
    for i in range(len(anchors)):
        incoherent = []
        if tuple(anchors[i]) in incoherences.keys():
            incoherences[tuple(anchors[i])][0].append(i)
            continue
        for j in range(len(texts)):
            if i == j or predictions[i] == predictions[j]:
                continue
            count = 0
            for word in anchors[i]:
                if word in texts[j]:
                    count += 1
            if count == len(anchors[i]):
                incoherent.append(j)
        if incoherent:
            if len(incoherent) >= 10:
                incoherences.update({tuple(anchors[i]): ([i], [0], len(incoherent)/len(anchors))})
            else:
                incoherences.update({tuple(anchors[i]): ([i], incoherent, len(incoherent)/len(anchors))})
    print("Anchor : ([of instances], [instances that contain anchor but have different predicition or 0 if too many], coverage")
    pprint(incoherences)


# "input, ground_truth, prediction, explanation, precision "
def write_explanations(dataset, filename="rt2_test.explanations"):
    with open(filename, "w") as f:
        for text_, gt in tqdm(dataset):
            pred_ = explainer.class_names[predict_rt([text_])[0]]
            exp_ = explainer.explain_instance(text_, predict_rt, threshold=0.5)
            anchor = ' AND '.join(exp_.names())
            line = ','.join([("b\"" + text_ + "\""), gt, pred_, anchor, str(exp_.precision())])
            f.write(line + '\n')
            #print(line)

# Args generation on Text data
### Arguments generation

In [None]:
def get_dataset_from_file(file):
    dataset = []
    flag = "__label__"
    with open(file, "r") as f:
        for l in f.readlines():
            s = l.split(flag)
            gt = s[1].strip(flag)
            try:
                text = s[2].strip("\n").strip("\"").strip("\'")
            except IndexError:
                print(s)
                print(l)
                raise IndexError
            dataset.append((text, gt))
    return dataset


def get_predictions(dataset, model=rt_model):
    """
    Returns a list of predictions of texts in dataset. Format is int 0 (negative) or 1 (positive)
    :param dataset:
    :param model:
    :return:
    """
    all_texts = [text for text, _ in dataset]
    res = rt_model.predict(all_texts)
    assert len(all_texts) == len(res[0])
    flag = '__label__'
    return [int(label.strip(flag)) for label in np.array(res[0]).squeeze()]

In [None]:
# combination not in alphabetical order, potential_args_checked not kept for later
def generate_args_lenN_by_text(n, texts_by_word, predictions, minimals=None):

    def is_minimal(potential_arg, cl, minimals, n):
        # cl is class
        set_potential_arg = set(potential_arg)
        for k in range(n):
            for comb_ in itertools.combinations(sorted(potential_arg), k+1):
                if frozenset(comb_) in minimals[cl][k]:
                    return False
        return True

    if minimals is None:
        minimals = ([], [])
    assert len(minimals[0]) == n-1
    minimals[0].append(set())
    minimals[1].append(set())

    args = [set(), set()]
    potential_args_checked_count = 0
    for i, text in tqdm(enumerate(all_split_texts)):
        for potential_arg in itertools.combinations(sorted(text), n):
            cl = predictions[i]
            potential_args_checked_count += 1
            if not is_minimal(potential_arg, cl, minimals, n-1):
                continue
            selection = set.intersection(*[texts_by_word[w] for w in potential_arg])  # all texts with all words of potential argument
            selection_preds = [predictions[i_] for i_ in selection]
            if selection_preds[:-1] == selection_preds[1:]:
                    args[selection_preds[0]].add(frozenset(potential_arg))
                    minimals[cl][n-1].add(frozenset(potential_arg))
    print(potential_args_checked_count, ' potential arg checked.')
    return args, minimals

In [None]:
def check_args_naive(args_neg, args_pos):
    intersection = set(args_neg) & set(args_pos)
    if intersection:
        print("Check args failed")
        print(len(intersection))
        print('intersection = ', intersection)
        all_texts = [text for text, _ in dataset]
        print(list(intersection)[0][0])
        for i, text in enumerate(all_texts):
            if list(intersection)[0][0] in text.split():
                print(predictions[i])
    else:
        print("Check args successful, intersection is empty")
    print('len(args_neg)=', len(args_neg))
    print('len(args_pos)=', len(args_pos))


def check_args_consistency(args, dataset, predictions):
    all_texts = [text for text, _ in dataset]
    for w1, w2 in args:
        preds = []
        temp_texts = []
        for i, text in enumerate(all_texts):
            if w1 in text.split() and w2 in text.split():
                preds.append(predictions[i])
                temp_texts.append(text)
                if preds[-1] != preds[0]:
                    print(w1, w2, preds)
                    pprint(temp_texts)
                    assert min(preds) == max(preds)

    print('success for consistency')

In [None]:
dataset = get_dataset_from_file("rt2.train")
predictions = get_predictions(dataset, rt_model)

In [None]:
import nltk
nltk.download("stopwords")

def clean_texts(texts, rm_stop_words=True, rm_punctuation=True, rm_uniques=False):
    split_texts = []
    if rm_stop_words:
        stop_words = set(stopwords.words('english'))
        stop_words.difference_update({'but', 'between', 'again', 'very', 'out', 'most', 'off', 'until', 'more', 'down',
                                      'while', 'should', 'both', 'no', 'any', 'then', 'because', 'before', 'then',
                                      'because', 'why', 'so', 'not', 'now', 'where ', 'after', 'against', 'further',
                                      'than'})
        print(stop_words)
    else:
        stop_words = set()

    if rm_punctuation:
        stop_words.update({',', '\'', '.', ';', '--', '(', ')'})
    if rm_uniques:
        uniques = {}
        uniques = defaultdict(lambda: 0, uniques)
        for text in texts:
            for w in text:
                uniques[w] += 1
        for k, v in uniques.items():
            if v == 1:
                stop_words.add(k)

    for text in tqdm(texts):
        split_texts.append([w for w in text if not w.lower() in stop_words])

    return split_texts

In [None]:
vocabulary = []
all_texts = [text for text, _ in dataset]
for text in all_texts:
    for word in text.split():
        vocabulary.append(word)

lens_text = []
all_texts = [text for text, _ in dataset]
for text in all_texts:
    lens_text.append(len(text.split()))
print('average len=', np.mean(lens_text))

In [None]:
all_split_texts = [text.split() for text, _ in dataset]

# Preprocessing.
print('Preprocessing texts')
all_split_texts = clean_texts(all_split_texts, rm_uniques=True)

for text in all_split_texts:
    lens_text.append(len(text))
print('New average len=', np.mean(lens_text))
print(len(all_split_texts))

print('Initialising texts_by_word...')
texts_by_word = dict()
for i, text in enumerate(all_split_texts):
    for word in set(text):
        if word not in texts_by_word:
            texts_by_word.update({word: {i}})
        else:
            texts_by_word[word].add(i)
print('texts_by_word initialised')

minimals = None
anymore_args = True
k = 1
while anymore_args:
    print("Generating args length %d (by text):" % k)
    [args_neg, args_pos], minimals = generate_args_lenN_by_text(k, texts_by_word, predictions, minimals)
    #pd.to_pickle(minimals[k-1], 'rt2_dev_minimals' + str(k) + '.df')
    check_args_naive(args_neg, args_pos)
    anymore_args = len(minimals[0][-1]) != 0 or len(minimals[1][-1]) != 0
    k += 1

In [None]:
pprint(minimals)