In [23]:
import os
print(os.getcwd())
# Please change the root to an absolute or relative path to DomiKnowS root.
# In case relative path is used, consider the printed `CWD` as current working directory.
root = '/home/hfaghihi/Framework/DomiKnowS'

/home/hfaghihi/Framework/DomiKnowS/examples/ACE05


In [24]:
import sys
sys.path.append(root)

In [25]:
from typing import Any
from regr.sensor.pytorch.sensors import ReaderSensor, ConstantSensor, FunctionalSensor, FunctionalReaderSensor, TorchEdgeSensor


class MultiLevelReaderSensor(ConstantSensor):
    def __init__(self, *pres, keyword=None, edges=None, label=False, device='auto'):
        super().__init__(*pres, data=None, edges=edges, label=label, device=device)
        self.keyword = keyword

    def fill_data(self, data_item):
        try:
            if isinstance(self.keyword, tuple):
                self.data = (self.fetch_key(data_item, keyword) for keyword in self.keyword)
            else:
                self.data = self.fetch_key(data_item, self.keyword)
        except KeyError as e:
            raise KeyError("The key you requested from the reader doesn't exist: %s" % str(e))

    def fetch_key(self, data_item, key):
        data = []
        if "." in key:
            keys = key.split(".")
            items = data_item
            loop = 0
            direct_loop = True
            for key in keys:
                if key == "*":
                    loop += 1
                    if loop == 1:
                        keys = items.keys()
                        items = [items[key] for key in keys]
                    if loop > 1:
                        keys = [item.keys() for item in items]
                        new_items = []
                        for index, item in enumerate(items):
                            for index1, key in enumerate(keys[index]):
                                new_items.append(item[key])
                        items = new_items
                else:
                    if loop == 0:
                        items = items[key]
                    if loop > 0:
                        items = [it[key] for it in items]
            data = items
        else:
            data = data_item[key]

        return data
        
        
    def forward(self, *_) -> Any:
        if isinstance(self.keyword, tuple) and isinstance(self.data, tuple):
            return (super().forward(data) for data in self.data)
        else:
            return super().forward(self.data)

In [130]:
import torch

from regr.graph import Concept
from regr.program import POIProgram
from regr.sensor.pytorch.sensors import ReaderSensor, ConstantSensor, FunctionalSensor, FunctionalReaderSensor, TorchEdgeSensor
from regr.sensor.pytorch.learners import ModuleLearner
from regr.sensor.pytorch.relation_sensors import CandidateSensor, CandidateRelationSensor

from sensors.tokenizers import TokenizerEdgeSensor
from models import Tokenizer, BERT, SpanClassifier, cartesian_concat, token_to_span_candidate, span_candidate_emb, span_label, span_emb, find_is_a


def model(graph):
    graph.detach()

    ling_graph = graph['linguistic']
    ace05_graph = graph['ACE05']
    entities_graph = ace05_graph['Entities']
    relations_graph = ace05_graph['Relations']
    events_graph = ace05_graph['Events']

    document = ling_graph['document']
    token = ling_graph['token']
    span_candidate = ling_graph['span_candidate']
    span_annotation = ling_graph['span_annotation']
    span = ling_graph['span']
    document_contains_token = document.relate_to(token)[0]
    span_contains_token = span.relate_to(token)[0]
    span_is_span_candidate = span.relate_to(span_candidate)[0]

    document['index'] = ReaderSensor(keyword='text')
    document_contains_token['forward'] = TokenizerEdgeSensor('index', mode='forward', to=('index', 'ids', 'offset'), tokenizer=Tokenizer())
    token['emb'] = ModuleLearner('ids', module=BERT())

    span_annotation['extent'] = MultiLevelReaderSensor(keyword="spans.*.mentions.*.extent.start")
    span_annotation['extent'] = ConstantSensor(data=["a", "b", "c", "d"])
    span_annotation['type'] = MultiLevelReaderSensor(keyword="spans.*.mentions.*.type")

    program = POIProgram(graph, poi=(token, span_candidate, span,))

    return program


In [29]:
from ace05.reader import Reader, DictReader
import config

sensor = MultiLevelReaderSensor(keyword="spans.*.mentions.*.head.start")
sensor1 = MultiLevelReaderSensor(keyword="spans.*.mentions.*.type")
traint_reader = DictReader(config.path, list_path=config.list_path, type='train', status=config.status)
# sensor.fill_data(next(iter(traint_reader)))
# sensor1.fill_data(next(iter(traint_reader)))
# print(sensor.data)
# print(sensor1.data)

In [31]:
item = next(iter(traint_reader))
item['spans']['MARKETVIEW_20050228.2211-E1']['mentions']['MARKETVIEW_20050228.2211-E1-10']['type']

'NOM'

In [21]:
from regr.sensor.pytorch.query_sensor import DataNodeSensor


In [151]:
def fetch_key(data_item, key):
        data = []
        if "." in key:
            keys = key.split(".")
            items = data_item
            loop = 0
            direct_loop = True
            for key in keys:
                if key == "*":
                    loop += 1
                    if loop == 1:
                        keys = items.keys()
                        items = [items[key] for key in keys]
                    if loop > 1:
                        keys = [item.keys() for item in items]
                        new_items = []
                        for index, item in enumerate(items):
                            for index1, key in enumerate(keys[index]):
                                new_items.append(item[key])
                        items = new_items
                else:
                    if key == "subtype":
                        new_items = []
                        for item in items:
                            for i in range(len(item['mentions'])):
                                if  isinstance(item[key], Concept):
                                    new_items.append(item[key].name)
                                else:
                                    new_items.append(None)
                        items = new_items
                        
                    elif key == "type":
                        new_items = []
                        for item in items:
                            for i in range(len(item['mentions'])):
                                if  isinstance(item[key], Concept):
                                    new_items.append(item[key].name)
                                else:
                                    new_items.append(None)
                        items = new_items
                    elif loop == 0:
                        items = items[key]
                    elif loop > 0:
                        items = [it[key] for it in items]
                    
            data = items
        else:
            data = data_item[key]
        
        return data

In [177]:
input_t = fetch_key(item, "spans.*.type")
print(input_t)
print(len(input_t))

['ORG', 'ORG', 'ORG', 'PER', 'PER', 'PER', 'PER', 'PER', 'PER', 'PER', 'PER', 'PER', 'PER', 'PER', 'PER', 'ORG', 'ORG', 'ORG', 'ORG', 'ORG', 'ORG', 'ORG', 'ORG', 'PER', 'PER', 'PER', 'PER', 'PER', 'PER', 'Timex2', 'Timex2']
31


In [69]:
traint_reader = DictReader(config.path, list_path=config.list_path, type='train', status=config.status)
item = next(iter(traint_reader))

In [78]:
from ace05.annotation import *
item['events']['MARKETVIEW_20050228.2211-EV1']['mentions']['MARKETVIEW_20050228.2211-EV1-1']['anchor']

{'start': 281, 'end': 286, 'text': 'offer'}

In [77]:
sensor1 = MultiLevelReaderSensor(keyword="events.*.mentions.*.anchor.start")
traint_reader = DictReader(config.path, list_path=config.list_path, type='train', status=config.status)
sensor1.fill_data(next(iter(traint_reader)))
print(sensor1.data)

[281, 738, 775, 304, 412, 640, 824]


In [173]:
import torch

class CustomConstantSensor(ConstantSensor):
    def __init__(self, *pres, keyword=None, edges=None, label=False, device='auto', concept=None, data=None):
        super().__init__(*pres, data=data, edges=edges, label=label, device=device)
        self.concept_name = concept
        
    def forward(self, *_) -> Any:
        output = []
        for data in self.data:
            if data == self.concept_name:
                output.append(1)
            else:
                output.append(0)
        if self.label == True:
            output = torch.tensor(output)
        return output

In [174]:
sensor_c = CustomConstantSensor(label=True, concept='PER-Individual', data = input_t)

In [175]:
sensor_c.forward()

tensor([0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1,
        1, 0, 0, 0, 1, 0, 0])