# Endproduct

In this notebook. Instead of using the FAST API, a notebook will be used to create the proof of concept. This decision was made due to the lack of time for this project. This notebook will combine all the seperate products that have been researsched during this project.

The products:
* The emotion detection model
* The translators
* The pre-trained chatbot
* The retrieval based output with the feedback algorithm

For each product, a Class will be made to make objects and calls.

The emotion detection model was saved in the google drive of the creator. Note that in order to make it work, u need to change the path to the path were the model is saved.

Note: This notebook is made in google colab. All file paths are to files in the google drive. If used elsewhere, change necessary paths to correct paths in order to connect everything.

In [None]:
# necessary libraries
!pip install transformers
!pip install torch
!pip install sklearn
!pip install pandas
!pip install sentencepiece
!pip install tqdm

Collecting transformers
  Downloading transformers-4.34.0-py3-none-any.whl (7.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.7/7.7 MB[0m [31m20.9 MB/s[0m eta [36m0:00:00[0m
Collecting huggingface-hub<1.0,>=0.16.4 (from transformers)
  Downloading huggingface_hub-0.18.0-py3-none-any.whl (301 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m302.0/302.0 kB[0m [31m37.0 MB/s[0m eta [36m0:00:00[0m
Collecting tokenizers<0.15,>=0.14 (from transformers)
  Downloading tokenizers-0.14.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (3.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.8/3.8 MB[0m [31m58.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting safetensors>=0.3.1 (from transformers)
  Downloading safetensors-0.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m47.3 MB/s[0m eta [36m0:00:00[0m
Col

In [None]:
# necessary imports
from transformers import AutoTokenizer, BlenderbotSmallForConditionalGeneration, AutoModelForSeq2SeqLM
from transformers import RobertaTokenizer
import tqdm
import torch
from torch import cuda
import sqlite3 as sql
from google.colab import drive
import random as rd
import sentencepiece
import os
import re
from typing import List

In [None]:
# connect to drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# connect cuda to gpu
device = 'cuda' if cuda.is_available() else 'cpu'

### Emotion detecion model

Retrieve the emotion model to make predictions.

In [None]:
# retrieve robbert tokenizer
tokenizer = RobertaTokenizer.from_pretrained("pdelobelle/robbert-v2-dutch-base", truncation=True, do_lower_case=True)

Downloading (…)okenizer_config.json:   0%|          | 0.00/1.14k [00:00<?, ?B/s]

Downloading (…)olve/main/vocab.json:   0%|          | 0.00/733k [00:00<?, ?B/s]

Downloading (…)olve/main/merges.txt:   0%|          | 0.00/383k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading (…)/main/tokenizer.json:   0%|          | 0.00/1.12M [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/660 [00:00<?, ?B/s]

In [None]:
# retrieve model using path
path = "drive/MyDrive/cimsolutions_emotional_chatbot/ed_model/robbert_model.pth"
model = torch.load(path)
model.eval()

RobertaForSequenceClassification(
  (roberta): RobertaModel(
    (embeddings): RobertaEmbeddings(
      (word_embeddings): Embedding(40000, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): RobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x RobertaLayer(
          (attention): RobertaAttention(
            (self): RobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): RobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
             

In [None]:
""" This class contains the robbert model. which detects an emotion. emotions are labeled and send back to the user."""
class RobbertModel:
  def __init__(self, model, tokenizer, device) -> None:
    self.model =  model
    self.tokenizer = tokenizer
    self.labels = {0: 'neutral', 1:'joy', 2:'fear', 3:'anger', 4:'sad', 5:'love', 6:"other"}
    self.device = device

  def convert(self, text: str):
    # tokenize text to make predictions
    tokenized = self.tokenizer(text)

    tt = {
    'ids': torch.tensor(tokenized['input_ids'], dtype=torch.long),
    'mask': torch.tensor(tokenized['attention_mask'], dtype=torch.long),
    }

    ids = tt['ids'].to(self.device, dtype = torch.long)
    mask = tt['mask'].to(self.device, dtype = torch.long)
    return ids, mask

  def predict(self, text: str) -> str:
    ids, mask = self.convert(text)
    output = self.model(ids.unsqueeze(0), mask.unsqueeze(0))
    emotion =  self.labels[output.logits.argmax(1).item()]

    return emotion

In [None]:
robbert = RobbertModel(model, tokenizer, device)

In [None]:
# test
robbert.predict("ik ben blij")

'love'

### Translators

Create a translation class that contains both the english and dutch translators to make easy calls using one class

In [None]:
# import models and tokenizers
tokenizer_en = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-nl-en")
tokenizer_nl = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-nl")

Downloading (…)okenizer_config.json:   0%|          | 0.00/42.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/1.38k [00:00<?, ?B/s]

Downloading (…)olve/main/source.spm:   0%|          | 0.00/814k [00:00<?, ?B/s]

Downloading (…)olve/main/target.spm:   0%|          | 0.00/790k [00:00<?, ?B/s]

Downloading (…)olve/main/vocab.json:   0%|          | 0.00/1.66M [00:00<?, ?B/s]



Downloading (…)okenizer_config.json:   0%|          | 0.00/42.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/1.38k [00:00<?, ?B/s]

Downloading (…)olve/main/source.spm:   0%|          | 0.00/790k [00:00<?, ?B/s]

Downloading (…)olve/main/target.spm:   0%|          | 0.00/814k [00:00<?, ?B/s]

Downloading (…)olve/main/vocab.json:   0%|          | 0.00/1.66M [00:00<?, ?B/s]

In [None]:
model_en = AutoModelForSeq2SeqLM.from_pretrained("Helsinki-NLP/opus-mt-nl-en")

Downloading pytorch_model.bin:   0%|          | 0.00/316M [00:00<?, ?B/s]

Downloading (…)neration_config.json:   0%|          | 0.00/293 [00:00<?, ?B/s]

In [None]:
model_nl = AutoModelForSeq2SeqLM.from_pretrained("Helsinki-NLP/opus-mt-en-nl")

Downloading pytorch_model.bin:   0%|          | 0.00/316M [00:00<?, ?B/s]

Downloading (…)neration_config.json:   0%|          | 0.00/293 [00:00<?, ?B/s]

In [None]:
""" This class contains both translators. Based on the given language input,
the sentence gets translated from en-nl or nl-en"""

class Translators:
  def __init__(self, en_tokenizer, nl_tokenizer, en_model, nl_model) -> None:
    self.en_tokenizer = en_tokenizer
    self.nl_tokenizer = nl_tokenizer
    self.en_model = en_model
    self.nl_model = nl_model

  def en_translate(self, text: str) -> str:
      # translate to english
      batch = self.en_tokenizer([text], return_tensors="pt")
      generated_ids = self.en_model.generate(**batch)
      translation = self.en_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

      return translation

  def nl_translate(self, text: str) -> str:
      # translate to dutch
      batch = self.nl_tokenizer([text], return_tensors="pt")
      generated_ids = self.nl_model.generate(**batch)
      translation = self.nl_tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

      return translation

  def translate(self, text: str, language:str) -> str:
    # choose translator based on given language
    if language == "nl":
      translated = self.nl_translate(text)
    elif language == "en":
      translated = self.en_translate(text)
    else:
      translated = "Language not found"

    return translated


In [None]:
translator = Translators(tokenizer_en, tokenizer_nl, model_en, model_nl)

In [None]:
# test
nl_en = translator.translate("ik ben blij", "en")
en_nl = translator.translate("I'm happy", "nl")

print(nl_en, en_nl)

I'm glad Ik ben gelukkig.


### Pre-trained Chatbot
Importing the 90M small blenderbot from facebook. This chatbot will be used for simple conversation. The class will remove any unnecessary characters.

In [None]:
# import model and tokenizer
blenderbot = BlenderbotSmallForConditionalGeneration.from_pretrained("facebook/blenderbot_small-90M", add_cross_attention=False)
bb_tokenizer = AutoTokenizer.from_pretrained("facebook/blenderbot_small-90M")

Downloading (…)lve/main/config.json:   0%|          | 0.00/1.51k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/350M [00:00<?, ?B/s]

Downloading (…)neration_config.json:   0%|          | 0.00/311 [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/205 [00:00<?, ?B/s]

Downloading (…)olve/main/vocab.json:   0%|          | 0.00/964k [00:00<?, ?B/s]

Downloading (…)olve/main/merges.txt:   0%|          | 0.00/345k [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/99.0 [00:00<?, ?B/s]

In [None]:
""" This class contains the pre trained blenderbot. Using it's tokenizer, it returns a given input."""

class Chatbot:
  def __init__(self, model, tokenizer) -> None:
    self.model = model
    self.tokenizer = tokenizer

  def convert(self, text: str):
    # convert to reply_ids which in return provide a output from the chatbot
    inputs = self.tokenizer([text], return_tensors="pt")
    reply_ids = self.model.generate(**inputs)
    return reply_ids

  def clean_string(self, text: str) -> str:
    # using split. the sentence will always be in the same place. This way u can easily retrieve it

    splitted_text = text.split('_')
    text = splitted_text[4]

    return text

  def response(self, text: str) -> str:
    # generate clean response from chatbot
    reply_ids = self.convert(text)
    output = self.tokenizer.batch_decode(reply_ids)
    response = self.clean_string(output[0])
    return response

In [None]:
chatbot = Chatbot(blenderbot, bb_tokenizer)

In [None]:
# test
res = chatbot.response("how are you doing")
print(res)

 i'm doing well, thank you. what about you? what are you up to? 


### Retrieval based output and database
Create the retrieval based output class and connect with the database. This simulates the API connection by making the retrieval based system work together with the robbert model.

Connect with database

In [None]:
db_path = "drive/MyDrive/cimsolutions_emotional_chatbot/ed_output/ed_outputs.db"

In [None]:
# make connection to the database
conn = sql.connect(db_path)
cur = conn.cursor()

In [None]:
# database content
cur.execute("SELECT id, output, emotion, score FROM outputs")
cur.fetchall()

[(1, 'Laten we de situatie bespreken en een oplossing vinden.', 'angry', 0),
 (2,
  ' Ik begrijp dat je het niet leuk vindt. Ik ge voor jou een oplossing zoeken.',
  'angry',
  0),
 (3,
  'Laten we kijken wat er aan de hand is en bekijken hoe we het kunnen oplossen.',
  'angry',
  0),
 (4,
  'Laten we kalm blijven en samen naar een bruikbare oplossing zoeken.',
  'angry',
  0),
 (5,
  'Als er zaken zijn die moeten worden opgelost, laten we ze uitzoeken  en aanpakken.',
  'angry',
  0),
 (6, 'Fijn om te horen dat je tevreden bent!', 'joy', 0),
 (7, 'Het is goed om te horen dat je zo blij bent!', 'joy', 0),
 (8, 'Dat is goed nieuws, fijn!', 'joy', 0),
 (9, 'Graag gedaan, blij dat ik kan helpen!', 'joy', 0),
 (10, 'Wat leuk!', 'joy', 0),
 (11, 'Laat me weten hoe ik kan helpen.', 'sad', 0),
 (12, 'Als je iets nodig hebt, aarzel dan niet om te vragen.', 'sad', 0),
 (13, 'We gaan samen een oplossing zoeken..', 'sad', 0),
 (14, 'Als je hulp nodig hebt, laat het me weten.', 'sad', 0),
 (15, 'A

In [None]:
""" This class works with the robbert model class.
To provide a response given a detected emotion.
The response comes from the database"""

class RetrievalOutput:
  def __init__(self, conn, cur, epsilon, ed_model):
    self.database = conn,
    self.cur = cur
    self.memory = []
    self.ed_model = ed_model
    self.conv_id = 0
    self.epsilon = epsilon
    self. categories = {"pos": ["joy", "love"], "neu": ["neutral", "other"], "neg": ["sad", "angry", "fear"]}
    self.grading = {"neg-neg": -1, "neg-neu": 1, "neg-pos": 2, "neu-neu": 0, "neu-pos": 1, "neu-neg": -1, "pos-pos": 1, "pos-neu": 0, "pos-neg": -2}

  def check_emotion(self, text: str) -> str:
    # in this function get the emotion of a text
    emotion = self.ed_model.predict(text)
    # small print statement to show in the test conversation
    print("detected_emotion: ", emotion)
    return emotion

  def get_output(self, emotion:str):
    # retrieve from db using policy greedy
    if emotion != "neutral" and emotion != "other":
      rd_int = round(rd.random(), 2)
      if rd_int > self.epsilon:
        # random
        query = "SELECT id, output, emotion, score FROM outputs WHERE emotion = '%s' ORDER BY RANDOM() LIMIT 1" % emotion
      else:
        # not random
        query = "SELECT id, output, emotion, score FROM outputs WHERE emotion = '%s' ORDER BY score DESC LIMIT 1" % emotion

      output = self.cur.execute(query)
      output = list(self.cur.fetchone())

    else:
      output = None

    return output

  def save(self, input:str, rb_output:List, emotion:str):
    # save conversation as dict for grading

    # get correct category using emotion
    for key, value in self.categories.items():
      for x in value:
        if x == emotion:
          category = key
    conversation = {"conv_id": self.conv_id, "input": input, "rb": rb_output, "pnn": category}
    self.memory.append(conversation)

  def grade(self):
    # grade previous response using current conversation
     sentiment = self.memory[self.conv_id]['pnn']
     if len(self.memory) > 1:
      prev_conv = self.memory[self.conv_id - 1]
      prev_sentiment = prev_conv['pnn']

      change = prev_sentiment + "-" + sentiment
      print("conversation-change: ", change)
      grade = self.grading[change]
      # grade the score of the previous response in the database
      if prev_sentiment != "neu":
        prev_conv['rb'][3] = prev_conv['rb'][3] + grade
        self.update_db(prev_conv['rb'])

     self.conv_id += 1

  def update_db(self, response:List):
    # update the correct db output with the new score
    query = "UPDATE outputs SET score = %s WHERE id = %s" % (response[3], response[0])
    cur.execute(query)

  def reset_score(self):
    # reset all scores of db
    query = "UPDATE outputs SET score = 0"
    cur.execute(query)

  def reset_memory(self):
    # reset memory of class
    self.memory.clear()

  def forward(self, text:str) -> str:
    # return response.
    emotion =  self.check_emotion(text)
    rb_output =  self.get_output(emotion)
    self.save(text, rb_output, emotion)
    self.grade()

    if rb_output == None:
      return rb_output

    else:
      return rb_output[1]

In [None]:
rbo =  RetrievalOutput(conn, cur, 0.9, robbert)

In [None]:
# test
rbo.forward("ik ben bang")

detected_emotion:  fear


'Kan ik iets doen om je gerust te stellen?'

# Endproduct
Using al the classes, a final class called output will be created. This class uses all other classes to provide an answer to users. Normally this would be the main.py in a normal project.

In [None]:
""" This class creates a conversation agent that can be used to creat small conversations where an emotion is being detected.
It combines the chatbot output with the retrieval based output (could be made cleaner with nlp)"""

class ConversationAgent:

  def __init__(self, chatbot, translators, rbo):
    self.cb = chatbot
    self.translators =  translators
    self.rbo = rbo

  def get_rbo(self, text:str) -> str:
    # get output from retrieval based system
    rb_output = self.rbo.forward(text)
    return rb_output

  def cb_response(self, text:str) -> str:
    # get output from chatbot
    en_text = self.translators.translate(text, "en")
    cb_text = self.cb.response(en_text)
    nl_text =  self.translators.translate(cb_text, "nl")
    return nl_text

  def combine_outputs(self, cb_output:str, retrieval_output:str) -> str:
    # combine the two outputs
    combined =  cb_output + retrieval_output
    return combined

  def forward(self, text: str) -> str:
    # get response
    cb_output =  self.cb_response(text)
    retrieval_output = self.get_rbo(text)
    if retrieval_output ==  None:
      return cb_output
    else:
    combined = self.combine_outputs(cb_output, retrieval_output)
      return combined

In [None]:
# test
#reset memory and database before the testing starts
rbo.reset_score()
rbo.reset_memory

conv_agent = ConversationAgent(chatbot, translator, rbo)

In [None]:
# small example. don't have to run this cell
conv_agent.forward("hoe gaat het")

detected_emotion:  sad
conversation-change:  neg-neg


'Ik weet niet of ik terug naar school wil.Als je hulp nodig hebt, laat het me weten.'

### Test conversation
The cell below lets u have a conversation using the two outputs of the conv_agent class. Run the database cell after to show the scores of all the outputs in the database.

In [None]:
x = 0
while x < 10:
  user_input = input("enter your text here: ")
  if user_input == "exit":
    break
  else:
    response = conv_agent.forward(user_input)
    print("response: ", response)
    x += 1

enter your text here: ik ben bang
detected_emotion:  fear
conversation-change:  neg-neg
response:  Wat is er aan de hand in je leven waar je bang voor bent?Kan ik iets doen tegen je angst?
enter your text here: nu werkt het wel
detected_emotion:  neutral
conversation-change:  neg-neu
response:  Fijn om te horen, ik ben blij dat het goed voor je gaat.


KeyboardInterrupt: ignored

In [None]:
cur.execute("SELECT id, output, emotion, score FROM outputs")
cur.fetchall()

[(1, 'Laten we de situatie bespreken en een oplossing vinden.', 'angry', 0),
 (2,
  ' Ik begrijp dat je het niet leuk vindt. Ik ge voor jou een oplossing zoeken.',
  'angry',
  0),
 (3,
  'Laten we kijken wat er aan de hand is en bekijken hoe we het kunnen oplossen.',
  'angry',
  0),
 (4,
  'Laten we kalm blijven en samen naar een bruikbare oplossing zoeken.',
  'angry',
  0),
 (5,
  'Als er zaken zijn die moeten worden opgelost, laten we ze uitzoeken  en aanpakken.',
  'angry',
  0),
 (6, 'Fijn om te horen dat je tevreden bent!', 'joy', 0),
 (7, 'Het is goed om te horen dat je zo blij bent!', 'joy', 0),
 (8, 'Dat is goed nieuws, fijn!', 'joy', 0),
 (9, 'Graag gedaan, blij dat ik kan helpen!', 'joy', 0),
 (10, 'Wat leuk!', 'joy', 0),
 (11, 'Laat me weten hoe ik kan helpen.', 'sad', -1),
 (12, 'Als je iets nodig hebt, aarzel dan niet om te vragen.', 'sad', 0),
 (13, 'We gaan samen een oplossing zoeken..', 'sad', 0),
 (14, 'Als je hulp nodig hebt, laat het me weten.', 'sad', 0),
 (15, '