In [None]:
# -*- coding: utf-8 -*-
import glob
import os
from pyexpat import ExpatError
from xml.dom import minidom

import pandas as pd
from nltk.corpus import stopwords
from tqdm import tqdm

STOP_WORDS = set(stopwords.words('english')) | set('the')

pd.set_option('display.width', 1000)
dataset_csv_file = 'dataset_dataframe.csv'
types = set()

training_dataset_dataframe = None


def get_entity_dict(sentence_dom):
    entities = sentence_dom.getElementsByTagName('entity')
    entity_dict = {}
    for entity in entities:
        id = entity.getAttribute('id')
        word = entity.getAttribute('text')
        entity_dict[id] = word
    return entity_dict


def normalize_sentence(row):
    sentence = row.sentence_text.replace('.', ' . ')
    sentence = sentence.replace(',', ' , ')
    e1 = row.e1
    e2 = row.e2
    new_sentence_tokenized = []
    i = 0
    for word in sentence.split():
        if word in STOP_WORDS:
            continue
        if word.lower() == e1.lower():
            new_sentence_tokenized.append('DRUG')
            i += 1
        elif word.lower() == e2.lower():
            new_sentence_tokenized.append('OTHER_DRUG')
            i += 1
        elif i == 0:
            new_sentence_tokenized.append(word + '_bf')
        elif i == 1:
            new_sentence_tokenized.append(word + '_be')
        else:
            new_sentence_tokenized.append(word + '_af')
    normalized_sentence = ' '.join(new_sentence_tokenized).strip()
    # print(e1, e2, ' :  sentence :', sentence, 'new_sentence', normalized_sentence, '\n\n')
    return normalized_sentence


def get_dataset_dataframe(directory=None):
    global training_dataset_dataframe, dataset_csv_file

    if training_dataset_dataframe:
        return training_dataset_dataframe
    global types

    if directory is None:
        directory = os.path.expanduser('E:/VIT/RBL/ddi/dataset/DDICorpus/Train/DrugBank/')

    dataset_csv_file_prefix = str(directory.split('/')[-3]).lower() + '_'

    dataset_csv_file = dataset_csv_file_prefix + dataset_csv_file
    if os.path.isfile(dataset_csv_file):
        df = pd.read_csv(dataset_csv_file)
        return df

    lol = []
    total_files_to_read = glob.glob(directory + '*.xml')
    print('total_files_to_read:' , len(total_files_to_read) , ' from dir: ' , directory)
    for file in tqdm(total_files_to_read):
        try:
            DOMTree = minidom.parse(file)
            sentences = DOMTree.getElementsByTagName('sentence')

            for sentence_dom in sentences:
                entity_dict = get_entity_dict(sentence_dom)

                pairs = sentence_dom.getElementsByTagName('pair')
                sentence_text = sentence_dom.getAttribute('text')
                for pair in pairs:
                    ddi_flag = pair.getAttribute('ddi')
                    print(pair.attributes().items())
                    if not os.path.isfile('types'):
                        types.add(pair.getAttribute('type'))
                    if ddi_flag == 'true':
                        e1 = pair.getAttribute('e1')
                        e2 = pair.getAttribute('e2')
                        relation_type = pair.getAttribute('type')
                        lol.append([sentence_text, entity_dict[e1], entity_dict[e2], relation_type])
        except ExpatError:
            pass

    pd.to_pickle(types, 'types')
    df = pd.DataFrame(lol, columns='sentence_text,e1,e2,relation_type'.split(','))
    df['normalized_sentence'] = df.apply(normalize_sentence, axis=1)
    df.to_csv(dataset_csv_file)
    df = pd.read_csv(dataset_csv_file)
    return df


def get_training_label(row):
    global types

    types = pd.read_pickle('types')
    types = [t for t in types if t]
    type_list = list(types)
    relation_type = row.relation_type
    X = [i for i, t in enumerate(type_list) if relation_type == t]
    # s = np.sum(X)
    if X:
        return X[0]
    else:
        return 1


In [2]:
# -*- coding: utf-8 -*-
from nltk.util import ngrams

from dataset.read_dataset import get_dataset_dataframe
from grammar.chunker import Chunker
from grammar.syntactic_grammar import PatternGrammar

frequent_word_pairs = None
K = 200
import pandas as pd

from spacy.lang.en import English

parser = English()
import os

from itertools import combinations
from collections import Counter


def get_dataset_dictionary():
    top_post_fixed_word_file = 'top_post_fixed_word.pkl'
    if os.path.isfile(top_post_fixed_word_file):
        return pd.read_pickle(top_post_fixed_word_file)
    df = get_dataset_dataframe()
    word_counter = Counter()
    for _, row in df.iterrows():
        unique_tokens = sorted(set(word for word in row.normalized_sentence.split()))
        # exclude duplicates in same line and sort to ensure one word is always before other
        bi_grams = ngrams(row.normalized_sentence.split(), 2)
        word_counter += Counter([' '.join(bi_gram).strip() for bi_gram in bi_grams])
        word_counter += Counter(unique_tokens)
    frequent_words = sorted(list(dict(word_counter.most_common(100000)).keys()))  # return the actual Counter object
    pd.to_pickle(frequent_words, top_post_fixed_word_file)
    return frequent_words


def extract_top_word_pair_features():
    frequent_phrase_pickle_path = 'frequent_phrase.pkl'
    if not os.path.isfile(frequent_phrase_pickle_path):
        df = get_dataset_dataframe()
        pair_counter = Counter()
        for _, row in df.iterrows():

            unique_tokens = sorted(set(word for word in row.normalized_sentence.split()))
            # exclude duplicates in same line and sort to ensure one word is always before other
            combos = combinations(unique_tokens, 2)
            pair_counter += Counter(combos)

        frequent_phrase = sorted(list(dict(pair_counter.most_common(K)).keys()))  # return the actual Counter object
        pd.to_pickle(frequent_phrase, frequent_phrase_pickle_path)
    else:
        frequent_phrase = pd.read_pickle(frequent_phrase_pickle_path)
    print('frequent_phrase: ' , frequent_phrase[:5])
    return frequent_phrase


def extract_top_syntactic_grammar_trio():
    top_syntactic_grammar_trio_file = 'top_syntactic_grammar_trio_file.pkl'
    if os.path.isfile(top_syntactic_grammar_trio_file):
        return pd.read_pickle(top_syntactic_grammar_trio_file)

    df = get_dataset_dataframe()
    trio_counter = Counter()
    for _, row in df.iterrows():
        combos = extract_syntactic_grammar(row.sentence_text)
        trio_counter += Counter(combos)

    frequent_trio_counter = sorted(list(dict(trio_counter.most_common(K)).keys()))  # return the actual Counter object
    pd.to_pickle(frequent_trio_counter, top_syntactic_grammar_trio_file)
    return frequent_trio_counter


def extract_dependency_relations(sentence):
    # TODO : introduce dependency relation later
    parsedEx = parser(sentence)
    for token in parsedEx:
        print(token.orth_, token.dep_, token.head.orth_)


def extract_syntactic_grammar(sentence):
    grammar = PatternGrammar().get_syntactic_grammar(0)
    chunk_dict = Chunker(grammar).chunk_sentence(sentence)
    trigrams_list = []
    for key, pos_tagged_sentences in chunk_dict.items():
        pos_tags = [token[1] for pos_tagged_sentence in pos_tagged_sentences for token in pos_tagged_sentence]
        if len(pos_tags) > 2:
            trigrams = ngrams(pos_tags, 3)
            trigrams_list = [' '.join(trigram) for trigram in trigrams]

    return trigrams_list


# if __name__ == '__main__':
#     df = get_dataset_dataframe()
#     print(get_dataset_dictionary())


In [3]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import nltk

from grammar.pos_tagger import PosTagger



class Chunker:
    def __init__(self, grammar: nltk.RegexpParser):
        self.grammar = grammar

    def chunk_sentence(self, sentence: str):
        pos_tagged_sentence = PosTagger(sentence).pos_tag()
        return dict(self.chunk_pos_tagged_sentence(pos_tagged_sentence))

    def chunk_pos_tagged_sentence(self, pos_tagged_sentence):
        chunked_tree = self.grammar.parse(pos_tagged_sentence)
        chunk_dict = self.extract_rule_and_chunk(chunked_tree)
        return chunk_dict

    def extract_rule_and_chunk(self, chunked_tree: nltk.Tree) -> dict:
        def recursively_get_pos_only(tree, collector_list=None, depth_limit=100):
            if collector_list is None:
                collector_list = []
            if depth_limit <= 0:
                return collector_list
            for subtree in tree:
                if isinstance(subtree, nltk.Tree):
                    recursively_get_pos_only(subtree, collector_list, depth_limit - 1)
                else:
                    collector_list.append(subtree)
            return collector_list

        def get_pos_tagged_and_append_to_chunk_dict(chunk_dict, subtrees):  # params can be removed now
            pos_tagged = recursively_get_pos_only(subtrees)
            chunk_dict[subtrees.label()].append(pos_tagged)

        chunk_dict = nltk.defaultdict(list)
        for subtrees in chunked_tree:
            if isinstance(subtrees, nltk.Tree):
                get_pos_tagged_and_append_to_chunk_dict(chunk_dict, subtrees)
                for sub in subtrees:
                    if isinstance(sub, nltk.Tree):
                        get_pos_tagged_and_append_to_chunk_dict(chunk_dict, sub)
        return chunk_dict


In [4]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import nltk
from nltk import PerceptronTagger


class PosTagger:
    def __init__(self, sentence):
        """

        Args:
            sentence:
        """
        self.sentence = sentence
        self.tagger = PosTagger.get_tagger()

    def pos_tag(self):
        """

        Returns:

        """
        tokens = nltk.word_tokenize(self.sentence)
        pos_tagged_tokens = self.tagger.tag(tokens)
        return pos_tagged_tokens

    @staticmethod
    def get_tagger():
        """

        Returns:

        """
        return PerceptronTagger()


In [5]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import nltk

syntactic_compiled_grammar = {}


class PatternGrammar:
    @property
    def syntactic_grammars(self):
        grammar = {
            0: """
                JJ_VBG_RB_DESCRIBING_NN: {   (<CC|,>?<JJ|JJ.>*<VB.|V.>?<NN|NN.>)+<RB|RB.>*<MD>?<WDT|DT>?<VB|VB.>?<RB|RB.>*(<CC|,>?<RB|RB.>?<VB|VB.|JJ.|JJ|RB|RB.>+)+}
                """,
            1: """
                    VBG_DESRIBING_NN: {<NN|NN.><VB|VB.>+<RB|RB.>*<VB|VB.>}
                """,
        }
        return grammar

    def get_syntactic_grammar(self, index):
        global syntactic_compiled_grammar
        compiled_grammar = syntactic_compiled_grammar.get(index, None)
        if compiled_grammar is None:
            compiled_grammar = self.compile_syntactic_grammar(index)
            syntactic_compiled_grammar[index] = compiled_grammar
        return compiled_grammar

    def compile_syntactic_grammar(self, index):
        return nltk.RegexpParser(self.syntactic_grammars[index])


In [21]:
# -*- coding: utf-8 -*-
from itertools import combinations

from nltk import ngrams
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import numpy as np

dataset_dictionary = None
top_word_pair_features = None
top_syntactic_grammar_list = None

trained_model_pickle_file = 'trained_model.pkl'


def get_empty_vector(n):
    return [0 for _ in range(n)]


def get_top_word_dataset_dictionary():
    from feaure_extraction.feature_vector import get_dataset_dictionary

    global dataset_dictionary
    if dataset_dictionary is None:
        dataset_dictionary = get_dataset_dictionary()
    return dataset_dictionary


def get_top_word_pair_features():
    from feaure_extraction.feature_vector import extract_top_word_pair_features

    global top_word_pair_features
    if top_word_pair_features is None:
        top_word_pair_features = extract_top_word_pair_features()
    return top_word_pair_features


def get_top_syntactic_grammar_list():
    from feaure_extraction.feature_vector import extract_top_syntactic_grammar_trio

    global top_syntactic_grammar_list
    if top_syntactic_grammar_list is None:
        top_syntactic_grammar_list = extract_top_syntactic_grammar_trio()
    return top_syntactic_grammar_list


def get_word_feature(normalized_sentence):
    unique_tokens = set(word for word in normalized_sentence.split())
    # exclude duplicates in same line and sort to ensure one word is always before other
    bi_grams = set(ngrams(normalized_sentence.split(), 2))
    words = unique_tokens | bi_grams
    dataset_dictionary = get_top_word_dataset_dictionary()
    X = [i if j in words else 0 for i, j in enumerate(dataset_dictionary)]
    return X


def get_frequent_word_pair_feature(normalized_sentence):
    unique_tokens = sorted(set(word for word in normalized_sentence.split()))
    # exclude duplicates in same line and sort to ensure one word is always before other
    combos = combinations(unique_tokens, 2)
    top_word_pair_features = get_top_word_pair_features()
    X = [i if j in combos else 0 for i, j in enumerate(top_word_pair_features)]
    return X


def get_syntactic_grammar_feature(sentence_text):
    from feaure_extraction.feature_vector import extract_syntactic_grammar
    trigrams_list = extract_syntactic_grammar(sentence_text)
    top_syntactic_grammar_list = get_top_syntactic_grammar_list()
    X = [i if j in trigrams_list else 0 for i, j in enumerate(top_syntactic_grammar_list)]
    return X


def make_feature_vector(row):
    normalized_sentence = row.normalized_sentence
    sentence = row.sentence_text

    word_feature = get_word_feature(normalized_sentence)
    frequent_word_feature = get_frequent_word_pair_feature(normalized_sentence)
    syntactic_grammar_feature = get_syntactic_grammar_feature(sentence)

    features = word_feature
    features.extend(frequent_word_feature)
    features.extend(syntactic_grammar_feature)
    return features

def main():
    from dataset.read_dataset import get_dataset_dataframe
    df = get_dataset_dataframe()
    X, Y = extract_training_data_from_dataframe(df)
    from sklearn.svm import SVC
    X_train, X_test, y_train, y_test = \
        train_test_split(X, Y, test_size=.2, random_state=42)

    print(df.head())
    print('X: ', (X.shape), 'Y : ', np.array(Y.shape))
    model = SVC(kernel='linear')
    model.fit(X_train, y_train)
    score = model.score(X_test, y_test)
    import pandas as pd

    pd.to_pickle(model, trained_model_pickle_file)
    classification_report()
    print('Score : ', score)
    y_pred = model.predict(X_test)
    print(classification_report(y_test, y_pred))


def extract_training_data_from_dataframe(df):
    from dataset.read_dataset import get_training_label

    X = df.apply(make_feature_vector, axis=1)
    Y = df.apply(get_training_label, axis=1)
    X = np.array(X.tolist())
    Y = np.array(Y.tolist())
    return X, Y


In [22]:
main()

total_files_to_read: 569  from dir:  E:/VIT/RBL/ddi/dataset/DDICorpus/Train/DrugBank/


100%|███████████████████████████████████████████████████████████████████████████████| 569/569 [00:02<00:00, 213.37it/s]


frequent_phrase:  [('(e_be', ',_af'), ('(e_be', ',_be'), ('(e_be', '._af'), ('(e_be', '._be'), ('(e_be', 'DRUG')]
   Unnamed: 0                                      sentence_text  \
0           0  Concurrent administration of a TNF antagonist ...   
1           1  Concurrent therapy with ORENCIA and TNF antago...   
2           2  There is insufficient experience to assess the...   
3           3  Co-administration of naltrexone with Acamprosa...   
4           4  Patients taking Acamprosate concomitantly with...   

               e1               e2 relation_type  \
0  TNF antagonist          ORENCIA        effect   
1         ORENCIA  TNF antagonists        advise   
2         ORENCIA         anakinra        advise   
3      naltrexone      Acamprosate     mechanism   
4     Acamprosate  antidepressants        effect   

                                 normalized_sentence  
0  Concurrent_bf administration_bf TNF_bf antagon...  
1  Concurrent_bf therapy_bf DRUG TNF_be antagonis...  

ValueError: The number of classes has to be greater than one; got 1 class

In [18]:
from sklearn.metrics import classification_report

from dataset.read_dataset import get_dataset_dataframe
from training.train import extract_training_data_from_dataframe, trained_model_pickle_file
import pandas as pd
import os
def predict():
    df = get_dataset_dataframe(directory=os.path.expanduser('E:/VIT/RBL/ddi/dataset/DDICorpus/Test/test_for_ddi_extraction_task/DrugBank/'))
    X, Y = extract_training_data_from_dataframe(df)
    model = pd.read_pickle(trained_model_pickle_file)
    y_pred  = model.predict(X)

    print(classification_report(Y, y_pred))



In [19]:
predict()

total_files_to_read: 158  from dir:  E:/VIT/RBL/ddi/dataset/DDICorpus/Test/test_for_ddi_extraction_task/DrugBank/


100%|███████████████████████████████████████████████████████████████████████████████| 158/158 [00:00<00:00, 287.20it/s]


FileNotFoundError: [Errno 2] No such file or directory: 'trained_model.pkl'