In [None]:
# default_exp core

In [None]:
# Testing the skipgram
import functools
from nltk.util import skipgrams
from sklearn.feature_extraction.text import CountVectorizer
test_txt = ['Abcd ef ghia.', ' Abcd ef ghia.', 'Abracadabra patas de cabra.']
text = [line.strip() for line in test_txt]
print(text)
skipper = functools.partial(skipgrams, n=3, k=2)
print(list(skipper(text[0])))
vectorizer = CountVectorizer(analyzer=skipper, min_df=3, max_df=0.9)
vectorizer.fit(text)
vectorizer.vocabulary_

['Abcd ef ghia.', 'Abcd ef ghia.', 'Abracadabra patas de cabra.']
[('A', 'b', 'c'), ('A', 'b', 'd'), ('A', 'b', ' '), ('A', 'c', 'd'), ('A', 'c', ' '), ('A', 'd', ' '), ('b', 'c', 'd'), ('b', 'c', ' '), ('b', 'c', 'e'), ('b', 'd', ' '), ('b', 'd', 'e'), ('b', ' ', 'e'), ('c', 'd', ' '), ('c', 'd', 'e'), ('c', 'd', 'f'), ('c', ' ', 'e'), ('c', ' ', 'f'), ('c', 'e', 'f'), ('d', ' ', 'e'), ('d', ' ', 'f'), ('d', ' ', ' '), ('d', 'e', 'f'), ('d', 'e', ' '), ('d', 'f', ' '), (' ', 'e', 'f'), (' ', 'e', ' '), (' ', 'e', 'g'), (' ', 'f', ' '), (' ', 'f', 'g'), (' ', ' ', 'g'), ('e', 'f', ' '), ('e', 'f', 'g'), ('e', 'f', 'h'), ('e', ' ', 'g'), ('e', ' ', 'h'), ('e', 'g', 'h'), ('f', ' ', 'g'), ('f', ' ', 'h'), ('f', ' ', 'i'), ('f', 'g', 'h'), ('f', 'g', 'i'), ('f', 'h', 'i'), (' ', 'g', 'h'), (' ', 'g', 'i'), (' ', 'g', 'a'), (' ', 'h', 'i'), (' ', 'h', 'a'), (' ', 'i', 'a'), ('g', 'h', 'i'), ('g', 'h', 'a'), ('g', 'h', '.'), ('g', 'i', 'a'), ('g', 'i', '.'), ('g', 'a', '.'), ('h', 'i', 'a')

ValueError: max_df corresponds to < documents than min_df

# SGNN

> Implementation of Self-Governing Neural Networks for speech act classification

Implementation of the [SGNN paper](https://www.aclweb.org/anthology/D19-1402.pdf) for speech act classification.
This repository is inspired by Guillaume Chevalier's [implementation](https://github.com/guillaume-chevalier/SGNN-Self-Governing-Neural-Networks-Projection-Layer), as well as his [discussion](https://github.com/guillaume-chevalier/SGNN-Self-Governing-Neural-Networks-Projection-Layer/issues/1) with [Sava Kalbachou](https://github.com/thinline).
This version implements some things differently from Guillaume's code, and extends beyond the projection layer all the way to a fully trainable network.

The network is trained to classify the [SwDA corpus](https://web.stanford.edu/~jurafsky/ws97/) utterances according to their speech act. The corpus was was pre-processed using Cristopher Pott's [project](https://github.com/cgpotts/swda/) related to it; the pre-processed data is included in [`data/swda-acttags-and-text.csv`](data/swda-acttags-and-text.csv) for repeateability.

In [None]:
#hide
from nbdev.showdoc import *

We first import the data from file

In [None]:
import pandas as pd
data_filepath = '/home/andres/repositories/SGNN/data/swda-acttags-and-text.csv'

data = pd.read_csv(data_filepath)

In [None]:
# See the data
data.describe

Divide data in test and train sets

In [None]:
#export
import functools
from nltk.util import skipgrams
import numpy as np
import pandas as pd
import random as rand
import scipy.sparse as sp
from sklearn.base import BaseEstimator
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf

from nearpy.hashes import RandomBinaryProjections

In [None]:
#export
def import_data(filepath):
    data = pd.read_csv(data_filepath)
    data = data.dropna(axis=0)  # Drop rows with NA values
    
    return data.Text, data.DamslActTag
    
    
def preprocess_data(X, y):
    # Convert labels to categories
    le = LabelEncoder()
    y = le.fit_transform(y)
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)
    
    # Convert categories to one-hot-encodings, as apparently needed by keras
    train_labels = tf.keras.utils.to_categorical(y_train)
    test_labels = tf.keras.utils.to_categorical(y_test)
    
    return X_train, X_test, train_labels, test_labels

In [None]:
X, y = import_data(data_filepath)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y)

See the data division

In [None]:
# print(X_train.head(),"\n", y_train.head())

Convert the data labels to one-hot vectors

## Let's build SGNN the pipeline for processing the data

In [None]:
#export
class MyRBP(BaseEstimator, RandomBinaryProjections):
    rand_seed = None  # Declare it as class variable
    def __init__(self, hash_name='hasher', projection_count=1, rand_seed=None):
        RandomBinaryProjections.__init__(self, hash_name, projection_count, rand_seed=rand_seed)
        
    def fit(self, X, y):
        self.rand = np.random.RandomState(self.rand_seed)  # rand seed after param setting
        self.reset(X.shape[1])
        
    def transform(self, X):
        return self.hash_vector(X)

    def fit_transform(self, X, y):
        self.fit(X, y)
        return self.transform(X)
    
    def hash_vector(self, v, querying=False):
        """
        Hashes the vector and returns the binary bucket key as string.
        """
        if sp.issparse(v):
            # If vector is sparse, make sure we have the CSR representation
            # of the projection matrix
            if self.normals_csr is None:
                self.normals_csr = sp.csr_matrix(self.normals)
            # Make sure that we are using CSR format for multiplication
            if not sp.isspmatrix_csr(v):
                v = sp.csr_matrix(v)
            # Project vector onto all hyperplane normals
            # projection = self.normals_csr.dot(v)
            projection = v.dot(sp.csr_matrix.transpose(self.normals_csr))
        else:
            # Project vector onto all hyperplane normals
            projection = np.dot(v, np.matrix.transpose(self.normals))
        # Return binary key
        return projection > 0

In [None]:
#export
def build_input_layer(T=80, d=14, char_ngram_range=(1, 4)):
    # T=80 projections for each of dimension d=14: 80 * 14 = 1120-dimensionnal word projections

    skipper = functools.partial(skipgrams, n=3, k=2) # 2-skip-3-grams
    char_term_frequency_params = {
        'char_term_frequency__analyzer': skipper,
        # 'char_term_frequency__lowercase': True,
        # 'char_term_frequency__ngram_range': char_ngram_range,
        # 'char_term_frequency__strip_accents': None,
        'char_term_frequency__min_df': 3,
        'char_term_frequency__max_df': 0.9,
        'char_term_frequency__max_features': int(1e7),
    }

    rand_seeds = [rand.randint(0, T*100) for i in range(T)] # Need a different seed for each hasher

    hashing_feature_union_params = {
        **{'union__random_binary_projection_hasher_{}__projection_count'.format(t): d
           for t in range(T)
        },
        **{'union__random_binary_projection_hasher_{}__hash_name'.format(t): 'hasher' + str(t)
           for t in range(T)
        },
        **{'union__random_binary_projection_hasher_{}__rand_seed'.format(t): rand_seeds[t]  # only AFTER hashing.
           for t in range(T)
        }
    }

    preprocessor = Pipeline([
        ("char_term_frequency", CountVectorizer()),
        ('union', FeatureUnion([
            ('random_binary_projection_hasher_{}'.format(t), MyRBP())
            for t in range(T)
        ]))
    ])
    
    params = dict()
    params.update(char_term_frequency_params)
    params.update(hashing_feature_union_params)
    preprocessor.set_params(**params)
    return preprocessor

In [None]:
preprocessor = build_input_layer()
preprocessor.fit_transform(["Esta es na pruebs","fadsf", "Oh no fasd", "fasdfaaaaaaaa" ])

In [None]:
#export
def build_keras_model(train_labels):
    model = tf.keras.Sequential()
    model.add(tf.keras.layers.Dense(256, activation=tf.nn.sigmoid, input_shape=(1120,)))
    model.add(tf.keras.layers.Dropout(0.25))
    model.add(tf.keras.layers.Dense(256, activation=tf.nn.sigmoid))
    model.add(tf.keras.layers.Dropout(0.25))
    model.add(tf.keras.layers.Dense(train_labels.shape[1], activation=tf.nn.softmax))

    # Cosine annealing decay
    lr_schedule = tf.keras.experimental.CosineDecay(0.025, decay_steps=1000000)
    # SGD optimizer with Nesterov momentum
    opt = tf.keras.optimizers.SGD(nesterov=True, learning_rate=lr_schedule)
    # opt = tf.keras.optimizers.SGD(nesterov=True)
    model.compile(loss='categorical_crossentropy',
                  optimizer=opt,
                  metrics=['accuracy'])

    return model

In [None]:
#export
def main():
    X, y = import_data(data_filepath)
    X_train, X_test, train_labels, test_labels = preprocess_data(X, y)
    
    input_layer = build_input_layer()
    train_features = input_layer.fit_transform(X_train)
    test_features = input_layer.transform(X_test)
    
    keras_model = build_keras_model(train_labels)
    keras_model.fit(train_features, train_labels, epochs=EPOCHS, batch_size=BATCH_SIZE)
    keras_model.evaluate(test_features, test_labels)

In [None]:
#export
EPOCHS = 200
BATCH_SIZE = 100
data_filepath = "/home/andres/repositories/SGNN/data/swda-acttags-and-text.csv"

main()
