In [None]:
import networkx as nx
from networkx.drawing.nx_pydot import write_dot

import matplotlib.pyplot as plt
import numpy as np
from IPython.display import display_png
import json
from pathlib import Path
import scipy

from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score
from sklearn.preprocessing import OneHotEncoder

import torch
import dgl
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from sentence_transformers import SentenceTransformer
from karateclub.node_embedding.neighbourhood.deepwalk import DeepWalk
from nodevectors import Node2Vec
from gensim.models import Word2Vec
from xgboost import XGBClassifier

In [3]:
def flatten(list_of_list):
    return [item for sublist in list_of_list for item in sublist]

def load_data(transcription_id, labels, data_type='training'):
    with open(f'{data_type}/{transcription_id}.json', 'r') as json_file:
        discourse_data = json.load(json_file)

    with open(f'{data_type}/{transcription_id}.txt', 'r') as txt_file:
        discourse_types = [line.strip().split() for line in txt_file]

    G = nx.DiGraph()
    for entry in discourse_data:
        G.add_node(entry['index'], speaker=entry['speaker'], text=entry['text'])

    for discourse_relation in discourse_types:
        node_from = int(discourse_relation[0])
        node_to = int(discourse_relation[-1])
        relation_type = discourse_relation[1]

        if G.has_node(node_from) and G.has_node(node_to):
            G.add_edge(node_from, node_to, type=relation_type)

    label = labels[transcription_id]
    
    return G, label

def precompute_embeddings(bert, data):
    embeddings = []
    for G, _ in data:
        deepwalk = DeepWalk()
        deepwalk.fit(G)
        embedding = deepwalk.get_embedding()
        embeddings.append(embedding)
    return flatten(embeddings)

# training and test sets of transcription ids
training_set = ['ES2002', 'ES2006','IS1003','IS1005', 'TS3012','TS3005']
#, 'ES2007', 'ES2008', 'ES2009', 'ES2010', 'ES2012', 'ES2013','ES2015',  'IS1004', 'IS1006', 'IS1007', 'TS3008', 'TS3009', 'TS3010', 
training_set = flatten([[m_id + s_id for s_id in 'abcd'] for m_id in training_set])

training_set.remove('IS1005d')
training_set.remove('TS3012c')

test_set = ['ES2005','IS1002', 'TS3011']
#'ES2016', 'IS1000', 'IS1001', 
            #, 'ES2004', 'ES2011']
            #, 'ES2014', 'IS1008', 'IS1009','TS3003', 'TS3004', 'TS3006', 'TS3007']

test_set = flatten([[m_id + s_id for s_id in 'abcd'] for m_id in test_set])
test_set.remove('IS1002a')

# training graph preprocessing
bert = SentenceTransformer('all-MiniLM-L6-v2')

with open("training_labels.json", "r") as file:
    training_labels = json.load(file)

y_training = []
X_training_data = [load_data(transcription_id, training_labels) for transcription_id in training_set]
y_training = flatten([label for _, label in X_training_data])
X_training = precompute_embeddings(bert, X_training_data)
print('training preprocessing done')

# test graph preprocessing
y_test = []
X_test_data = [load_data(transcription_id, training_labels, data_type='training') for transcription_id in test_set]
y_test = flatten([label for _, label in X_test_data])
X_test = precompute_embeddings(bert, X_test_data)

print('test preprocessing done')

training preprocessing done
test preprocessing done


In [3]:
# model training
clf = DecisionTreeClassifier(random_state=0)
clf.fit(X_training, y_training)
print('training done')

# test
y_pred = clf.predict(X_test)

f1_score(y_test, y_pred)

training done


0.18216318785578745

In [2]:
# model training
model = XGBClassifier(random_state=0)
model.fit(X_training, y_training)
print('training done')

# test
y_pred2 = model.predict(X_test)

f1_score(y_test, y_pred2)

NameError: name 'X_training' is not defined

In [5]:
# model training
model = RandomForestClassifier(random_state=0)
model.fit(X_training, y_training)
print('training done')

# test
y_pred3 = model.predict(X_test)

f1_score(y_test, y_pred3)

training done


0.0

In [31]:
# Assuming you have lists of speakers, texts, and labels
speakers = ['speaker1', 'speaker2', 'speaker3', ...]
texts = ['text1', 'text2', 'text3', ...]
labels = [0, 1, 0, ...]  # Replace with your actual labels

# Combine speakers, texts, and labels into a list of tuples
data = list(zip(speakers, texts, labels))

# Split data into training and testing sets
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)
y_train = [label for _, _, label in train_data]
y_test = [label for _, _, label in test_data]

# Function to create DGL graphs and add node features
def create_dgl_graphs(data):
    graphs = [dgl.DGLGraph() for _ in data]

    for g, (speaker, text, _) in zip(graphs, data):
        # Add nodes
        num_nodes = 2  # Number of nodes in each graph (assuming 2 nodes)
        g.add_nodes(num_nodes)

        # Add edges (assuming an edge between node 0 and node 1)
        g.add_edge(0, 1)

        # Add node features
        speakers_encoded = torch.tensor([[speaker == s for s in set(speakers)]]).float()
        texts_encoded = torch.tensor([[text == t for t in set(texts)]]).float()
        
        g.ndata['speaker'] = speakers_encoded
        g.ndata['text'] = texts_encoded

    return graphs

# Create DGL graphs and add node features
train_graphs = create_dgl_graphs(train_data)
test_graphs = create_dgl_graphs(test_data)

# Define the GCN model
class GCN(nn.Module):
    def __init__(self, in_feats, hidden_size, num_classes):
        super(GCN, self).__init__()
        self.conv1 = dgl.nn.GraphConv(in_feats, hidden_size)
        self.conv2 = dgl.nn.GraphConv(hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, g):
        x = F.relu(self.conv1(g, g.ndata['speaker']))
        x = F.relu(self.conv2(g, x))
        x = dgl.mean_nodes(g, 'x')
        x = self.fc(x)
        return x

# Create the GCN model
in_feats = train_graphs[0].ndata['speaker'].shape[1] + train_graphs[0].ndata['text'].shape[1]  # Number of input features
hidden_size = 64  # Number of hidden units
num_classes = 2  # Number of output classes
model = GCN(in_feats, hidden_size, num_classes)

# Convert the training labels to PyTorch tensors
y_train_tensor = torch.tensor(y_train)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training loop
for epoch in range(100):
    model.train()
    optimizer.zero_grad()
    outputs = [model(g) for g in train_graphs]
    loss = sum([criterion(output, y) for output, y in zip(outputs, y_train_tensor)])
    loss.backward()
    optimizer.step()

# Evaluation
model.eval()
with torch.no_grad():
    test_outputs = [model(g) for g in test_graphs]
    y_pred = torch.argmax(torch.cat(test_outputs), dim=1)

# Calculate F1 score
f1 = f1_score(y_test, y_pred)
print('F1 score:', f1)

TypeError: tuple indices must be integers or slices, not str

In [None]:
from lightgbm import LGBMClassifier

# model training
model = LGBMClassifier(random_state=0)
model.fit(X_training, y_training)
print('training done')

# test
y_pred_LGBM = model.predict(X_test)

f1_score(y_test, y_pred_LGBM)

[LightGBM] [Info] Number of positive: 3177, number of negative: 16089
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.008760 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 32640
[LightGBM] [Info] Number of data points in the train set: 19266, number of used features: 128
[LightGBM] [Info] [binary:BoostFromScore]: pavg=0.164902 -> initscore=-1.622198
[LightGBM] [Info] Start training from score -1.622198
training done


0.0

In [8]:


# Assuming X_data is a list of NetworkX DiGraphs with 'speaker' and 'text' attributes

# Function to create DGL graphs and add node features
def create_dgl_graphs(data):
    graphs = [dgl.from_networkx(G) for G in data]

    for g in graphs:
        # Extract 'speaker' and 'text' attributes
        speakers = [g.nodes[n]['speaker'] for n in g.nodes()]
        texts = [g.nodes[n]['text'] for n in g.nodes()]

        # Encode 'speaker' and 'text' using OneHotEncoder
        encoder = OneHotEncoder(sparse=False, handle_unknown='ignore')
        speakers_encoded = encoder.fit_transform([[s] for s in speakers])
        texts_encoded = encoder.fit_transform([[t] for t in texts])

        # Add one-hot encoded features to the node data of DGL graphs
        g.ndata['speaker'] = torch.tensor(speakers_encoded).float()
        g.ndata['text'] = torch.tensor(texts_encoded).float()

    return graphs

# Create DGL graphs and add node features
train_graphs = create_dgl_graphs(X_training)
test_graphs = create_dgl_graphs(X_test)

# Define the GCN model
class GCN(nn.Module):
    def __init__(self, in_feats, hidden_size, num_classes):
        super(GCN, self).__init__()
        self.conv1 = dgl.nn.GraphConv(in_feats, hidden_size)
        self.conv2 = dgl.nn.GraphConv(hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, g):
        x = F.relu(self.conv1(g, g.ndata['speaker']))
        x = F.relu(self.conv2(g, x))
        x = dgl.mean_nodes(g, 'x')
        x = self.fc(x)
        return x

# Create the GCN model
in_feats = train_graphs[0].ndata['speaker'].shape[1] + train_graphs[0].ndata['text'].shape[1]  # Number of input features
hidden_size = 64  # Number of hidden units
num_classes = 2  # Number of output classes
model = GCN(in_feats, hidden_size, num_classes)

# Convert the training labels to PyTorch tensors
y_train_tensor = torch.tensor(y_train)

# Define the loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Training loop
for epoch in range(100):
    model.train()
    optimizer.zero_grad()
    outputs = [model(g) for g in train_graphs]
    loss = sum([criterion(output, y) for output, y in zip(outputs, y_train_tensor)])
    loss.backward()
    optimizer.step()

# Evaluation
model.eval()
with torch.no_grad():
    test_outputs = [model(g) for g in test_graphs]
    y_pred = torch.argmax(torch.cat(test_outputs), dim=1)

# Calculate F1 score
f1 = f1_score(y_test, y_pred)
print('F1 score:', f1)


AttributeError: 'numpy.ndarray' object has no attribute 'is_directed'