In [1]:
from tokenizers import Regex
from tokenizers.normalizers import (
    Lowercase,
    NFD,
    StripAccents,
    Replace,
    Strip,
    Sequence,
)
from transformers import AutoTokenizer
from transformers.tokenization_utils_base import BatchEncoding
from typing import List, Union
import torch
import re

import logging

logger = logging.getLogger(__name__)

TS_TOKENS = {
    "eos_token": "<ts>",
    "pad_token": "<|endoftext|>",
    "additional_special_tokens": ["<speaker1>", "<speaker2>","<speaker3>"],
}


class SpokenNormalizer:
    """
    Normalizer (as in the `tokenizers` framework) which removes punctuation, force lowercase, etc
    """

    def __init__(self):
        self.normalizer = SpokenNormalizer.build_normalizer()

    def normalize_string(self, s):
        s = self.add_whitespace_after_punctuation(s)
        return self.normalizer.normalize_str(s)

    def add_whitespace_after_punctuation(self, s):
        """
        Don't know how to do this with the `tokenizers` library.
        So simple regexp for now...

        Without this function:

            "hello,,,there;everybody.whats     how are you?"
            -> "hellothereeverybodywhats how are you" (once decoded)

        With:

            "hello,,,there;everybody.whats     how are you?"
            -> "hello there everybody whats how are you"

        """
        s = re.sub(r"[\,\.\:\;]+(\w+)", r" \1", s)
        return s

    @staticmethod
    def build_normalizer():
        normalizer = Sequence(
            [
                NFD(),
                Lowercase(),
                StripAccents(),
                Replace(Regex(r'[\.\,\!\?\:\;\)\(\[\]"\-]'), ""),  # punctuation
                Replace(Regex(r"\s\s+"), " "),  # double spaces
                Strip(),
            ]
        )
        return normalizer


class SpokenDialogTokenizer(SpokenNormalizer):
    """
    A tokenizer wrapper for `AutoTokenizer.from_pretrained` which cleans/normalizes text
    strings, removes punctuations and creates `speaker_ids` (like TransferTransfo and similiar to Bert) where each utterance
    is imbued with a token corresponding to the correct speaker (<speaker1> and <speaker2>).

    Should work (kind of) like the normal `Tokenizers` in the `transformers` framework.

    IMPORTANT!!!
    ------------
    Do not have spaces prior to `eos_token`/<ts> in the complete dialog strings.
    The tokenizer inserts EMPTY SPACE!!!

    'hello there <ts>' -> ['hello', 'Ġthere' 'Ġ' '<ts>']

    this is bad!
    -----------------------------

    text_string = 'Yesterday Hello ther, "honey"<ts> godday... you are great<ts> Not as good as you!<ts>'
    o = tokenizer(text_string, return_tensors="pt")

    ----------------------------------------------------

    text_list = [
        'Yesterday Hello ther, "honey"',
        "godday... you are great",
        "Not as good as you!",
    ]
    o2 = tok(text_list, return_tensors="pt")
    print(o2["speaker_ids"] == o["speaker_ids"])
    for inps, spkrs in zip(o["input_ids"], o["speaker_ids"]):
        for i, s in zip(inps, spkrs):
            print(i.item(), s.item())

    ----------------------------------------------------

    list_of_lists = [text_list, text_list[:-1], text_list[:-2]]
    o = tok(text_string)
    o2 = tok(text_list)
    print(o2["speaker_ids"] == o["speaker_ids"])
    for i, s in zip(o["input_ids"], o["speaker_ids"]):
        print(i, s)


    """

    MODELS = [
        "microsoft/DialoGPT-small",
        "microsoft/DialoGPT-medium",
        "microsoft/DialoGPT-large",
        "gpt2",
    ]

    @property
    def unk_token(self):
        return self._tokenizer.unk_token

    @property
    def unk_token_id(self):
        return self._tokenizer.unk_token_id

    @property
    def eos_token(self):
        return self._tokenizer.eos_token

    @property
    def eos_token_id(self):
        return self._tokenizer.eos_token_id

    def __init__(
        self,
        pretrained_model_name_or_path: str = "gpt2",
        normalization=True,
    ):
        super().__init__()
        self.name_or_path = pretrained_model_name_or_path
        if pretrained_model_name_or_path not in self.MODELS:
            print(
                f"WARNING: not tested for {pretrained_model_name_or_path} tread carefully!\n{self.MODELS}"
            )
        self._tokenizer = AutoTokenizer.from_pretrained(
            pretrained_model_name_or_path, max_model_input_sizes=None
        )
        self.normalization = normalization

        # Set to large number to avoid warnings
        # Manually keep track of your models maximum input length
        self._tokenizer.model_max_length = 1e30

        # This goes in logging
        num_added_toks = self._tokenizer.add_special_tokens(TS_TOKENS)

        s = "Tokenizer initialization:\n"
        s += f"\tWe added {num_added_toks} tokens -> Special token map\n"
        for k, v in self._tokenizer.special_tokens_map.items():
            s += f"\t{k}: {v}\n"
        logger.info(s)

        # Turn-shift Token (eos_token)
        # self.eos_token = self._tokenizer.eos_token
        # self.eos_token_id = self._tokenizer.eos_token_id
        # self.unk_token = self._tokenizer.unk_token
        # self.unk_token_id = self._tokenizer.unk_token_id

        # Speaker Tokens
        self.sp1_token = TS_TOKENS["additional_special_tokens"][0]
        self.sp2_token = TS_TOKENS["additional_special_tokens"][1]
        self.sp3_token = TS_TOKENS["additional_special_tokens"][2]
        self.sp1_token_id = self._tokenizer.convert_tokens_to_ids(self.sp1_token)
        self.sp2_token_id = self._tokenizer.convert_tokens_to_ids(self.sp2_token)
        self.sp3_token_id = self._tokenizer.convert_tokens_to_ids(self.sp3_token)

    def __repr__(self):
        return self._tokenizer.__repr__()

    def __len__(self):
        return len(self._tokenizer)

    def normalize(self, string: str) -> str:
        if self.normalization:
            return self.normalize_string(string)
        return string

    def __call__(
        self,
        text: Union[str, List[str], List[List[str]]],
        return_token_type_ids: bool = True,
        include_pre_space: bool = False,
        include_end_ts: bool = True,
        **kwargs,
    ) -> BatchEncoding:
        """
        SpokenDialogTokenizer tokenization.

        `text` can be either a String, a List of Strings, or a List of Lists of Strings. The behaviour of
        this function depends on the `single_dialog` flag.

        `text` is String:           representation of entire dialog (including eos_token)
        `text` is List[str]:        representation of turns in a dialog (no eos_tokens)
        `text` is List[List[str]]:  multiple dialogs (lists of strings) (no eos_tokens)

        """

        # List of lists
        if isinstance(text, list) and isinstance(text[0], list):
            ret = {}
            for text_list in text:
                o = self(
                    text_list,
                    include_pre_space=include_pre_space,
                    include_end_ts=include_end_ts,
                )

                for k, v in o.items():
                    if not k in ret:
                        ret[k] = []
                    ret[k].append(v)
            return ret

        # List of strings, a dialog: ['hello', 'hello to you']
        elif isinstance(text, List):
            dialog_string = ""
            if include_pre_space:
                dialog_string = " "
            dialog_string += self.normalize(text[0])
            if len(text) > 1:
                dialog_string += self.eos_token
                for text_string in text[1:-1]:
                    dialog_string += " " + self.normalize(text_string) + self.eos_token
                dialog_string += " " + self.normalize(text[-1])
            if include_end_ts:
                dialog_string += self.eos_token
            text = dialog_string
        else:
            text = self.normalize(text)

        encoding = self._tokenizer(
            text=text,
            **kwargs,
        )

        if return_token_type_ids:
            encoding["speaker_ids"] = self._extract_speaker_states(
                encoding["input_ids"]
            )
        return encoding

    def _extract_speaker_states(self, input_ids):
        # extract speaker states
        back_to_list = False
        if not isinstance(input_ids, torch.Tensor):
            input_ids = torch.tensor(input_ids).unsqueeze(0)  # with batch dim
            back_to_list = True
        # initialize with speaker 1
        speaker_ids = torch.ones_like(input_ids) * self.sp1_token_id
        batch, eos_idx = torch.where(input_ids == self.eos_token_id)
        for b in batch.unique():
            tmp_eos = eos_idx[batch == b]
            if len(tmp_eos) == 1:
                speaker_ids[b, eos_idx + 1 :] = self.sp2_token_id
            else:
                start = tmp_eos[0]
                for i, eos in enumerate(tmp_eos[1:]):
                    if i % 2 == 0:
                        sp = self.sp2_token_id
                        speaker_ids[b, start + 1 : eos + 1] = sp
                    start = eos
                if i % 2 == 1:  # add sp2 tokens after last eos if i is odd
                    speaker_ids[b, start + 1 :] = self.sp2_token_id

        if back_to_list:
            speaker_ids = speaker_ids.squeeze().tolist()
            if isinstance(speaker_ids, int):
                speaker_ids = [speaker_ids]

        return speaker_ids

    def idx_to_tokens(self, ids):
        def list_ids_to_string(ids):
            return [
                self.convert_tokens_to_string(t)
                for t in self.convert_ids_to_tokens(ids)
            ]

        # tokenize keep tokens
        if isinstance(ids, torch.Tensor):
            ids = ids.tolist()

        if isinstance(ids, list):
            if isinstance(ids[0], list):
                ret = [list_ids_to_string(ids_list) for ids_list in ids]
            else:
                ret = list_ids_to_string(ids)
        else:
            ret = self.convert_tokens_to_string(self.convert_ids_to_tokens(ids))
        return ret

    def pad(self, *args, **kwargs):
        return self._tokenizer.pad(*args, **kwargs)

    def decode(self, *args, **kwargs):
        return self._tokenizer.decode(*args, **kwargs)

    def convert_ids_to_tokens(self, *args, **kwargs):
        return self._tokenizer.convert_ids_to_tokens(*args, **kwargs)

    def convert_tokens_to_ids(self, *args, **kwargs):
        return self._tokenizer.convert_tokens_to_ids(*args, **kwargs)

    def convert_tokens_to_string(self, *args, **kwargs):
        return self._tokenizer.convert_tokens_to_string(*args, **kwargs).strip()


if __name__ == "__main__":

    pretrained_model_name_or_path = "gpt2"
    tokenizer = SpokenDialogTokenizer(pretrained_model_name_or_path)

    turn_list = ["hello there how are you today?"]
    turn_list = ["hello", "good"]
    # turn_list = ["hello there how are you today?", "good", "great"]
    # turn_list = ["hello there how are you today?", "good", "great", 'yes']
    # turn_list = ["hello there how are you today?", "good", "great", 'yes', 'hello']
    # turn_list = ["hello there how are you today?", "good", "great", 'yes', 'hello', 'there']
    out = tokenizer([["hello", "bye"], ["hello", "bye", "you"]], include_end_ts=False)
    print(out)

    # double spaces
    s = "hello,,,there;everybody.whats<ts>     how are you?<ts>"
    print(s)
    t = tokenizer(s)
    print(tokenizer.decode(t["input_ids"]))

    s = "Hello there, how are you today?<ts> I'm doing good thank you!<ts> That's great<ts>"
    outputs = tokenizer(s)

    print(tokenizer.decode(outputs["input_ids"]))
    print(outputs["speaker_ids"])

    #outputs["speaker"]

    turn_list = [
        "hello there how are you doing today?",
        "I'm doing very well thank you, how about you?",
        "well, I'm sad",
    ]

    i = tokenizer(turn_list, include_end_ts=False, include_pre_space=True)["input_ids"]
    d = tokenizer.decode(i)

    very_long_string = ""
    for i in range(150):
        very_long_string += "I'm doing very well thank you, how about you?"
    print(len(very_long_string.split(" ")))

    _ = tokenizer(very_long_string)

    turn_list = [
        "hello there how are you doing today?",
        "I'm doing very well thank you, how about you?",
        "well, I'm sad",
    ]
    tok_out = tokenizer(turn_list, include_end_ts=False)
    #ids_list = tok_out["input_ids"]
    #ids_list = tok_out["input_ids"]
    #ids_tens = torch.tensor(tok_out["input_ids"])
    #t1 = tokenizer.idx_to_tokens(ids_list)
    #t2 = tokenizer.idx_to_tokens(ids_tens)
    #t3 = tokenizer.idx_to_tokens(ids_list[0])

    #outputs = tokenizer(list_of_lists, include_end_ts=False)

    output_strings = []
    for out in outputs["input_ids"]:
        output_strings.append(tokenizer.decode(out))

    #assert output_strings == output_list_of_lists


{'input_ids': [[31373, 50257, 33847], [31373, 50257, 33847, 50257, 345]], 'attention_mask': [[1, 1, 1], [1, 1, 1, 1, 1]], 'speaker_ids': [[50258, 50258, 50259], [50258, 50258, 50259, 50259, 50258]]}
hello,,,there;everybody.whats<ts>     how are you?<ts>
hello there everybody whats<ts> how are you<ts>
hello there how are you today<ts> i'm doing good thank you<ts> that's great<ts>
[50258, 50258, 50258, 50258, 50258, 50258, 50258, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50258, 50258, 50258, 50258]
1201


In [2]:
import pandas as pd
import re
df=pd.read_csv("Transcriptions.csv")
print(df)

                                     me003|i guess.|S|s|s
0                              me011|okay we're on.|S|s|s
1       me011|so just make sure that your wireless mik...
2                                  me003|check one.|S|s|s
3                                  me003|check one.|S|s|s
4       me011|and you should be able to see which one ...
...                                                   ...
108196                                me011|this is|D|s|s
108197                   fe016|it's really helpful.|S|s|s
108198  fe016|i mean adam and don will sort of meet.|S...
108199             fe016|and i think that's great.|S|s|ba
108200                          fe016|very useful.|S|s|ba

[108201 rows x 1 columns]


In [118]:
df.head()

Unnamed: 0,Speaker,Dialogue
0,Bob,"Hi Alan, third time today."
1,Alan,"Yeah, yeah, yeah, it’s a busy clinic…"
2,Bob,[Inaudible 00:03]. Okay.
3,Alan,"…busy clinic. So we’ve got Kate, she’s a midd..."
4,Bob,I don’t…is she known to us already?


In [119]:
text_lists = []
for text in df['Dialogue']:
    text_lists.append(text)

In [121]:
pretrained_model_name_or_path="microsoft/DialoGPT-small"
tokenizer = SpokenDialogTokenizer(pretrained_model_name_or_path)

# tokenizer.eos_token: '<ts>'
# tokenizer.eos_token_id: 50257

# tokenizer.sp1_token: '<speaker1>'
# tokenizer.sp1_token_id: 50258

# tokenizer.sp2_token: '<speaker2>'
# tokenizer.sp2_token_id: 50259



outputs = tokenizer(text_lists)

# print(outputs.keys())
# >>> dict_keys(['input_ids', 'attention_mask', 'speaker_ids'])

# input_ids: word embedding indices
# >>> input_ids: [8505, ...,  220, 50257, 5770, ..., 50257]

# attention_mask: mask to omit `pad_token` in loss
# >>> attention_mask: [1, ...,  1, 1, 1, ..., 1]

# speaker_ids: dialog state embeddings corresponind to speaker id (binary)
# >>> speaker_ids: [50258, ..., 50259, ..., 50258]

decoded_input = tokenizer.decode(outputs['input_ids']) # arugment must be a list

# >>> 'yesterday hello ther honey <ts> godday you are great <ts> not as good as you <ts>'

In [122]:
data=outputs['input_ids']

In [148]:
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import AdamW
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.preprocessing.sequence import pad_sequences

sequences = []
labels = []
seq = []
for token in data:
    seq.append(token)
    if token == 50257:  # Check for the turn-shift token
        labels.pop(0)
        labels.append(1)  # Append 1 to indicate the presence of a turn-shift token
        #sequences.append(seq.copy())  # Append the sequence
        seq = []  # Reset the sequence
    else:
        labels.append(0)  # Append 0 to indicate no turn-shift token
        sequences.append(seq.copy()) 
## Pad sequences to make them of equal length
padded_sequences = pad_sequences(sequences, padding='post')

# Convert sequences and labels to numpy arrays
sequences_np = np.array(padded_sequences)
labels_np = np.array(labels)

# Split the data into train and test sets
X_train, X_test, y_train, y_test = train_test_split(sequences_np, labels_np, test_size=0.2, random_state=42)

# Reshape X_train and X_test to match the expected input shape of LSTM
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))

# Define the model
model = Sequential()
model.add(LSTM(128, return_sequences=True, input_shape=(X_train.shape[1], 1)))
model.add(Dropout(0.1))
model.add(LSTM(128, return_sequences=True))
model.add(Dropout(0.1))
model.add(LSTM(128))
model.add(Dense(1, activation='sigmoid'))

# Define the optimizer with learning rate schedule
initial_learning_rate = 6.25e-5
lr_schedule = ExponentialDecay(
    initial_learning_rate, decay_steps=10000, decay_rate=0.96, staircase=True
)
optimizer = AdamW(learning_rate=lr_schedule)

# Compile the model
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])

# Train the model
model.fit(X_train, y_train, epochs=1, batch_size=32)

# Evaluate the model
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)

Test Loss: 0.2325638085603714
Test Accuracy: 0.9396268129348755
