Import dependencies:

In [1]:
from typing import Tuple, List, Iterable
from pydot import Dot, graph_from_dot_data, Edge
from graphviz.graphs import BaseGraph
from graphviz import Source
import amrlib
from amrlib.graph_processing.amr_plot import AMRPlot
import numpy as np
import pandas as pd
import csv, pickle
from tqdm.notebook import tqdm

Extract nodes and edges from AMR graphs.

In [3]:
#cite: https://stackoverflow.com/questions/47426249/finding-list-of-edges-in-graphviz-in-python 
def get_graph_dot_obj(graph_spec) -> List[Dot]:
    """Get a dot (graphs) object list from a variety 
    of possible sources (postelizing inputs here)"""
    _original_graph_spec = graph_spec
    if isinstance(graph_spec, (BaseGraph, Source)):
        # get the source (str) from a graph object
        graph_spec = graph_spec.source
    if isinstance(graph_spec, str):
        # get a dot-graph from dot string data
        graph_spec = graph_from_dot_data(graph_spec)
    # make sure we have a list of Dot objects now
    assert isinstance(graph_spec, list) and all(
        isinstance(x, Dot) for x in graph_spec
    ), (
        f"Couldn't get a proper dot object list from: {_original_graph_spec}. "
        f"At this point, we should have a list of Dot objects, but was: {graph_spec}"
    )
    return graph_spec

def get_edges(graph_spec, label = False):
    """Get a list of edges for a given graph (or list of lists thereof).
    If ``postprocess_edges`` is ``None`` the function will return ``pydot.Edge`` objects from
    which you can extract any information you want.
    By default though, it is set to extract the node pairs for the edges, and you can
    replace with any function that takes ``pydot.Edge`` as an input.
    """
    graphs = get_graph_dot_obj(graph_spec)
    n_graphs = len(graphs)
    if n_graphs > 1:
        return [get_edges(graph) for graph in graphs]
    elif n_graphs == 0:
        raise ValueError(f"Your input had no graphs")
    else:
        graph = graphs[0]
        edges = graph.get_edges()
        edges_list = []
        if not label:
            for edge in edges:
                r1, r2 = graph.get_node(edge.get_source())[0].get_label().strip('\"').strip('\\').strip('\"'), graph.get_node(edge.get_destination())[0].get_label().strip('\"').strip('\\').strip('\"')
                if '/' in r1:
                    r1 = r1.split('/')[1]
                elif '\\' in r1:
                    r1 = r1.split('\\')[0]
                
                if '/' in r2:
                    r2 = r2.split('/')[1]
                elif '\\' in r1:
                    r2 = r2.split('\\')[0]

                edges_list.append([r1,r2])
        else:
            for edge in edges:
                r1, r2, r3 = graph.get_node(edge.get_source())[0].get_label().strip('\"').strip('\\').strip('\"'), graph.get_node(edge.get_destination())[0].get_label().strip('\"').strip('\\').strip('\"'), edge.get_label().strip('\"')[1:]
                if '/' in r1:
                    r1 = r1.split('/')[1]
                elif '\\' in r1:
                    r1 = r1.split('\\')[0]
                
                if '/' in r2:
                    r2 = r2.split('/')[1]
                elif '\\' in r1:
                    print("called")
                    r2 = r2.split('\\')[0]

                edges_list.append([r1,r2,r3])
        
        return edges_list

Save large intermediate results (Only used for the first run). 

In [None]:
with open('train_AMR.csv', 'r') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader, None)
    g_train = []
    for row in tqdm(csv_reader, total=1600):
        AP = AMRPlot()
        AP.build_from_graph(entry = row)
        edges = get_edges(AP.graph, label=True)
        g_train.append(edges)

    np.save('g_train',g_train)
    print(g_train[1])

len(g_train)

In [None]:
with open('test_AMR.csv', 'r') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter=',')
    next(csv_reader, None)
    g_test = []
    for row in tqdm(csv_reader, total=400):
        AP = AMRPlot()
        AP.build_from_graph(entry = row)
        edges = get_edges(AP.graph, label=True)
        g_test.append(edges)

    np.save('g_test',g_test)
    print(g_test[1])

len(g_test)

  0%|          | 0/400 [00:00<?, ?it/s]

ignoring epigraph data for duplicate triple: ('b', ':mod', 'p3')
ignoring epigraph data for duplicate triple: ('c', ':ARG0', 'h')
ignoring epigraph data for duplicate triple: ('w2', ':ARG0', 'p')


[['create-01', 'company', 'ARG0'], ['company', 'name', 'name'], ['create-01', 'job', 'ARG1'], ['job', 'new-01', 'ARG1-of'], ['job', 'multiple', 'quant'], ['create-01', 'footprint', 'location'], ['footprint', 'company', 'poss'], ['footprint', 'country', 'location'], ['country', 'name', 'name'], ['create-01', 'increase-01', 'purpose'], ['increase-01', 'company', 'ARG0'], ['increase-01', 'effort-01', 'ARG1'], ['effort-01', 'company', 'ARG0'], ['effort-01', 'and', 'ARG1'], ['and', 'logistics', 'op1'], ['and', 'distribute-01', 'op2'], ['increase-01', 'surge-01', 'time'], ['surge-01', 'demand-01', 'ARG1'], ['surge-01', 'crisis', 'prep-amid'], ['crisis', 'coronavirus', 'mod'], ['name', 'Aldi', 'op1'], ['multiple', '1000', 'op1'], ['name', 'UK', 'op1']]


  arr = np.asanyarray(arr)


400

In [2]:
gtrs = np.load('g_train.npy',allow_pickle=True)
gtes = np.load('g_test.npy',allow_pickle=True)
gall = np.concatenate((gtrs, gtes), axis=0)
gtrs.shape, gtes.shape, gall.shape, type(gtrs)

((1600,), (400,), (2000,), numpy.ndarray)

Generate inventories for words and edges.

In [3]:
word_set = list({ts[i] for g in gall for ts in g for i in range(2)})
edge_set = list({ts[2] for g in gall for ts in g})
word_set.sort()
edge_set.sort()
word_to_id = dict(zip(word_set,[i for i in range(len(word_set))]))
edge_to_id = dict(zip(edge_set,[i for i in range(len(edge_set))]))
Vsize, Esize = len(word_to_id), len(edge_to_id)
Vsize, Esize

(5138, 109)

Data Embedding

In [4]:
def data_embedding(edges):
    # for a single tweet amr    
    # print(edges,"\n")
    nodes = list({edge[i] for edge in edges for i in range(2)})
    nodes_to_id = dict(zip(nodes,[i for i in range(len(nodes))]))
    # print(nodes_to_id,"\n")
    edge_index = [[nodes_to_id[edge[0]] for edge in edges], [nodes_to_id[edge[1]] for edge in edges]]
    x, edge_attr = [], []
    for node in nodes_to_id.keys():
        vector = np.zeros(Vsize)
        # one-hot vector
        vector[word_to_id[node]] = 1.0
        x.append(vector)

    for edge in edges:
        vector = np.zeros(Esize)
        # one-hot vector
        vector[edge_to_id[edge[2]]] = 1.0
        edge_attr.append(vector)

    return np.array(x), np.array(edge_index), np.array(edge_attr)

Get AMR classes (labels).

In [7]:
def label_converter(labels):
    nlabels = []
    for label in labels:
        if label == 'Positive':
            nlabels.append(2)
        elif label == 'Negative':
            nlabels.append(0)
        else:
            nlabels.append(1)
    return nlabels

with open('train_label.csv', 'r') as train_label, open('test_label.csv', 'r') as test_label:
    y_train = train_label.read().split('\n')
    y_test = test_label.read().split('\n')
    y_train = label_converter(y_train)
    y_test = label_converter(y_test)

len(y_train), len(y_test), y_train[2]

(1600, 400, 1)

Generate datasets for GNN.

In [8]:
import torch
from torch_geometric.data import Data

def get_dataset(graph,labels):
    dataset = []
    for i in range(len(graph)):
        x, edge_index, edge_attr = data_embedding(gtrs[i])
        x = torch.tensor(x, dtype=torch.float)
        edge_index = torch.tensor(edge_index, dtype=torch.long)
        dataset.append(Data(x=x, edge_index=edge_index, edge_attr=edge_attr, y=[labels[i]]))
    return dataset

train_dataset = get_dataset(gtrs,y_train)
test_dataset = get_dataset(gtes, y_test)
len(train_dataset), len(test_dataset)

(1600, 400)

In [11]:
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.datasets import Planetoid

class GCN(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = GCNConv(Vsize, 16)
        self.conv2 = GCNConv(16, 3)

    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, training=self.training)
        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = GCN().to(device)
# print(data.x.shape)
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-4)
model.train()
for epoch in tqdm(range(10), total=10):
    for i in range(1600):
        # print(i)
        if not gtrs[i]:
            continue
        data = train_dataset[i]
        optimizer.zero_grad()
        # print(data)
        out = model(data)
        # print(data)
        # print(out.shape, torch.reshape(torch.tensor(data.y),out.shape))
        loss = F.nll_loss(out, torch.tensor(data.y*np.ones(out.shape[0]), dtype=torch.long))
        loss.backward()
        # print(loss)
        optimizer.step()


# correct = (pred == data.y*np.ones(pred.shape[0])).sum()
# acc = int(correct) / int()
# print(f'Accuracy: {acc:.4f}')

  0%|          | 0/10 [00:00<?, ?it/s]

tensor([2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
        2, 2, 2, 2]) [2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2.
 2. 2. 2. 2.]


In [12]:
data = test_dataset[8]
model.eval()
pred = model(data).argmax(dim=1)
print(pred, data.y*np.ones(pred.shape[0]))

tensor([2, 2, 0, 2, 0, 2, 0, 0]) [0. 0. 0. 0. 0. 0. 0. 0.]
