In [1]:
import spacy
import pandas as pd

# from practnlptools.tools import Annotator
# annotator = Annotator()

nlp = spacy.load('en_core_web_sm')


In [2]:
collected = pd.read_csv('../../../../data/document_folds/collected_acp.csv')
cyber = pd.read_csv('../../../../data/document_folds/cyber_acp.csv')
ibm = pd.read_csv('../../../../data/document_folds/ibm_acp.csv')
t2p = pd.read_csv('../../../../data/document_folds/t2p_acp.csv')
acre = pd.read_csv('../../../../data/document_folds/acre_acp.csv')

## Narouei et al.

In [9]:
def postprocess(key, val):
    
    l = ['any', 'all', 'every','the']
    
    val = val.replace('_',',').replace('-', '').replace("'", "").replace('’', '').replace("”", "").replace("“","")
    
    if key == 'subject' or key == 'resource':
        for k in l:
            if val.split(' ')[0] == k:
                val = ' '.join(val.split(' ')[1:])
    
    if len(val)>=2 and val[-1] == 's' and val[-2] != 's':
        val = val[:-1]
        
    return ' '.join(val.split())

def get_senna_policies(sent):
    senna_srl = annotator.getAnnotations(sent)['srl']
    policies = []
    for srl in senna_srl:
        
        if 'V' not in srl:
            continue
        else:
            subjects, resources = [],[]
            action = srl['V']
            if 'A0' in srl:
                ners = nlp(srl['A0']).ents
                subjects = [srl['A0']] if len(ners)==0 else list(ners)
            if 'A1' in srl:
                nerr = nlp(srl['A1']).ents
                resources = [srl['A1']] if len(nerr)==0 else list(nerr) 
    
            for s in subjects:
                for r in resources:
                    p = {'subject': postprocess('subject', str(s).lower()), 'action': action, 'resource': postprocess('resource', str(r).lower())}
                    policies.append(p)

    return policies


def create_srls(sent):

    p = get_senna_policies(sent)
    srls = {}
    for pp in p:
        action = pp['action']
        if action not in srls:
            srls[action] = {'subject': [], 'resource': []}
            if str(pp['subject'])!='none':
                srls[action]['subject'].append(pp['subject'])
            if str(pp['resource'])!='none':
                srls[action]['resource'].append(pp['resource'])
        else:
            if str(pp['subject'])!='none':
                srls[action]['subject'].append(pp['subject'])
            if str(pp['resource'])!='none':
                srls[action]['resource'].append(pp['resource'])

    return srls

In [None]:
import json
pred_pols = []
for sent in collected['input'].to_list():
    pred_pols.append(create_srls(sent))
with open('senna/collected_senna_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)

pred_pols = []
for sent in ibm['input'].to_list():
    pred_pols.append(create_srls(sent))
with open('senna/ibm_senna_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)

pred_pols = []
for sent in cyber['input'].to_list():
    pred_pols.append(create_srls(sent))
with open('senna/cyber_senna_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)

pred_pols = []
for sent in t2p['input'].to_list():
    pred_pols.append(create_srls(sent))
with open('senna/t2p_senna_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)

pred_pols = []
for sent in acre['input'].to_list():
    pred_pols.append(create_srls(sent))
with open('senna/acre_senna_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)

## Xia et al.

In [5]:
from allennlp.predictors.predictor import Predictor
predictor_srl = Predictor.from_path("https://storage.googleapis.com/allennlp-public-models/structured-prediction-srl-bert.2020.12.15.tar.gz")

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.bias', 'cls.predictions.decoder.weight', 'cls.seq_relationship.bias']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [6]:
import string
    
def DFSUtil(s, visited, l):

    visited.add(s)

    if (str(s.dep_) == 'ROOT' or str(s.dep_) == 'conj' or str(s.dep_) == 'appos'):
        # print(f'1: {s.dep_}: {s.text}')
        l.append([s])
            
    elif (str(s.dep_) == 'amod' or str(s.dep_) == 'compound' or str(s.dep_) == 'nsubj' or str(s.dep_) == 'poss' or str(s.dep_) == 'npadvmod'):
        # print(f'2: {s.dep_}: {s.text} - {[w.text for w in s.lefts]} - {[w.text for w in s.rights]}')
        if str(s.dep_) == 'compound' or str(s.dep_) == 'npadvmod' or str(s.dep_) == 'amod':
            # print(len(l[-1]))
            if len(l[-1])>=2:
                nexttolast = l[-1][0]
                if s.text in [w.text for w in nexttolast.lefts]:
                    l[-1].insert(0, s)
                else:
                    l[-1].insert(-1, s)
            else:
                l[-1].insert(-1, s)
        else:
            l[-1].insert(-1, s)
                
    elif (str(s.dep_) == 'prep' or str(s.dep_) == 'pobj' or str(s.dep_) == 'dobj'):
        # print(f'3: {s.dep_}: {s.text}')      
        l[-1].append(s)
 
    for neighbour in s.children:
        if neighbour not in visited and neighbour.text not in string.punctuation:
            DFSUtil(neighbour, visited, l)
 
def DFS(v):
 
    visited = set()
    l = []

    DFSUtil(v, visited, l)
    interans = []
    for noun in l:
        nl = []
        for word in noun:
            nl.append(word.text)
        interans.append(nl)
    ans = [' '.join(k) for k in interans] 
    return (ans)
    
    
def get_root(doc):
    for token in doc:
        if (token.dep_ == 'ROOT'):
            return token

def postprocess(key, val):
    
    l = ['any', 'all', 'every','the']
    
    val = val.replace('_',',').replace('-', '').replace("'", "").replace('’', '').replace("”", "").replace("“","")
    
    if key == 'subject' or key == 'resource':
        for k in l:
            if val.split(' ')[0] == k:
                val = ' '.join(val.split(' ')[1:])
    
    if len(val)>=2 and val[-1] == 's' and val[-2] != 's':
        val = val[:-1]
        
    return ' '.join(val.split())

# DFS(get_root(doc))

In [7]:
def collect_acp(tokens, srl, use_dfs = True):
    prev = ""
    d = {}
    word = []
    for t,l in zip(tokens, srl):
        
        ind = l.split('-')[0]
        k = '-'.join(l.split('-')[1:])
        if k != prev:
            d[prev] = ' '.join(word)
            prev = k
            word = [t]
        if ind == 'I':
            word.append(t)
            
    d[prev] = ' '.join(word)
    p = []
    sub_doc = nlp(d['ARG0']) if 'ARG0' in d else 'none'
    # print(d['ARG0'])
    res_doc = nlp(d['ARG1']) if 'ARG1' in d else 'none'
    
    if use_dfs:
        subs = DFS(get_root(sub_doc)) if 'ARG0' in d else ['none']
        # print(subs)
        ress = DFS(get_root(res_doc)) if 'ARG1' in d else ['none']
        
    return subs, ress

def generate_acp(p, dopostprocess = True):
    
    tokens = p['words']
    acps = []
    l = ['feel','look','sound','taste','smell','seem','appear','become','grow','get','turn','fall','go','come','continue','remain','stay',
         'aware','can','could','be able to','may','might','must','dare','need','shall','should','be supposed to','ought to','will','would',
         'be going to','used to', 'is', 'am', 'are','was','were','have','has','had','do','does','did', 'permit','allow','enable','contain','include','consist'
        ]
    for v in p['verbs']:
        if v['verb'] not in l:
            docv = nlp(v['verb'])
            act = []
            for t in docv:
                act.append(t.lemma_)
            action = ' '.join(act)
            
            # print(v['verb'], v['tags'])
            sub,res = collect_acp(tokens, v['tags']) # Reurns subjects and resources for a given action
            # print(action, sub, res)
            for s in sub:
                for r in res:
                    if dopostprocess:
                        acps.append({'subject': postprocess('subject', s.lower()), 'action': action, 'resource': postprocess('resource', r.lower())})
                    else:
                        docs = nlp(s.lower())
                        sub = []
                        for t in docs:
                            sub.append(t.lemma_)
                        subject = ' '.join(sub)

                        docr = nlp(r.lower())
                        res = []
                        for t in docr:
                            res.append(t.lemma_)
                        resource = ' '.join(res)

                        acps.append({'subject': subject, 'action': action, 'resource': resource})
                        
                   
    return acps 


def create_srls_xia(sent, dopostprocess = True):

    p = generate_acp(predictor_srl.predict(sentence = sent), dopostprocess)
    srls = {}
    for pp in p:
        action = pp['action']
        if action not in srls:
            srls[action] = {'subject': [], 'resource': []}
            if str(pp['subject'])!='none':
                srls[action]['subject'].append(pp['subject'])
            if str(pp['resource'])!='none':
                srls[action]['resource'].append(pp['resource'])
        else:
            if str(pp['subject'])!='none':
                srls[action]['subject'].append(pp['subject'])
            if str(pp['resource'])!='none':
                srls[action]['resource'].append(pp['resource'])

    return srls

In [7]:
import json
pred_pols = []
for sent in collected['input'].to_list():
    try:
        pred_pols.append(create_srls_xia(sent))
    except:
        print(sent)
        print(generate_acp(predictor_srl.predict(sentence = sent)))
        break

with open('xia/collected_xia_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)

pred_pols = []
for sent in cyber['input'].to_list():
    try:
        pred_pols.append(create_srls_xia(sent))
    except:
        print(sent)
        print(generate_acp(predictor_srl.predict(sentence = sent)))
        break

with open('xia/cyber_xia_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)

pred_pols = []
for sent in ibm['input'].to_list():
    try:
        pred_pols.append(create_srls_xia(sent))
    except:
        print(sent)
        print(generate_acp(predictor_srl.predict(sentence = sent)))
        break

with open('xia/ibm_xia_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)

pred_pols = []
for sent in acre['input'].to_list():
    try:
        pred_pols.append(create_srls_xia(sent))
    except:
        print(sent)
        print(generate_acp(predictor_srl.predict(sentence = sent)))
        break

with open('xia/acre_xia_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)

pred_pols = []
for sent in t2p['input'].to_list():
    try:
        pred_pols.append(create_srls_xia(sent))
    except:
        print(sent)
        print(generate_acp(predictor_srl.predict(sentence = sent)))
        break

with open('xia/t2p_xia_pred_srl.json', 'w') as f:
    json.dump(pred_pols, f)


### Evaluating the demo results

In [2]:
import json

with open('../../../../demo/high_level_requirements.json','r') as f:
    sents = json.load(f)

In [8]:
pred_pols = []
for sent in sents:
    try:
        pred_pols.append(create_srls_xia(sent))
    except:
        print(sent)
        print(generate_acp(predictor_srl.predict(sentence = sent)))
        break

with open('../../../../demo/results/demo_pred_srl_xia.json', 'w') as f:
    json.dump(pred_pols, f)