<a href="https://colab.research.google.com/github/00SamYun/simple_chabot_model/blob/main/main_program.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# runtime - CPU

#### Setup

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
from IPython.display import clear_output

In [None]:
import os
import json
from urllib.request import Request, urlopen
from bs4 import BeautifulSoup

import re
import random
import numpy as np
from itertools import repeat
from random import choice

import tensorflow as tf
import nltk
import spacy
from nltk.tokenize import word_tokenize
from nltk.sentiment.vader import SentimentIntensityAnalyzer

from official.nlp import bert
import official.nlp.bert.configs
import official.nlp.bert.bert_models
import official.nlp.bert.tokenization

from transformers import TFT5ForConditionalGeneration, T5Tokenizer

tf.get_logger().setLevel('ERROR')

In [None]:
nltk.download('punkt', quiet=True)
nltk.download('vader_lexicon', quiet=True)

#### Input Model

In [None]:
class InputModel():
    def __init__(self): 

        gs_folder = "gs://cloud-tpu-checkpoints/bert/v3/uncased_L-12_H-768_A-12"
        config_file = os.path.join(gs_folder, "bert_config.json")
        config_dict = json.loads(tf.io.gfile.GFile(config_file).read()) 
        config = bert.configs.BertConfig.from_dict(config_dict) 

        self.model = bert.bert_models.classifier_model(config, num_labels=3)[0]
        self.model.load_weights('gs://PATH_TO_BUCKET/input_model/training_weights')

        self.tokenizer = bert.tokenization.FullTokenizer(
            vocab_file=os.path.join(gs_folder, 'vocab.txt'), do_lower_case=True)


    def get_kws(self, user_input):

        sp = spacy.load('en_core_web_sm')

        stopwords = sp.Defaults.stop_words
        pos_list = ['CD', 'FW', 'PRP', 'WP', 'WRB']
        pos_starters = ('JJ', 'NN', 'VB')

        text = ' '.join([w for w in word_tokenize(user_input) if not w in stopwords])
        tags = {tok.text: tok.tag_ for tok in sp(text)}

        unigrams = [w for w,p in tags.items() if p in pos_list or p.startswith(pos_starters)]

        return unigrams, user_input


    def encode(self, text): 

        tokens = list(self.tokenizer.tokenize(text))
        tokens.append('[SEP]')

        return self.tokenizer.convert_tokens_to_ids(tokens)
    

    def prepare_inputs(self, unigrams, user_input):

        contexts = [user_input]*len(unigrams)

        words = tf.constant([self.encode(w) for w in np.array(unigrams)])
        contexts = tf.constant([self.encode(c) for c in np.array(contexts)])

        cls = [self.tokenizer.convert_tokens_to_ids(['[CLS]'])]*words.shape[0]
        input_word_ids = tf.concat([cls, words, contexts], axis=-1)

        input_mask = tf.ones_like(input_word_ids)
        type_cls = tf.zeros_like(cls)
        type_words = tf.zeros_like(words)
        type_contexts = tf.ones_like(contexts)
        input_type_ids = tf.concat([type_cls, type_words, type_contexts], axis=-1)

        inputs = {
            'input_word_ids': input_word_ids,
            'input_mask': input_mask,
            'input_type_ids': input_type_ids}

        return inputs

    
    def assign(self, unigrams, model_inputs):

        logits = self.model(model_inputs)
        probabilities = tf.nn.softmax(logits)
        scores = tf.math.argmax(probabilities, axis=-1)
        pairs = zip(unigrams, scores)
        dct = {'subject': [], 'predicate': [], 'object': []}

        for kw, s in pairs: 
            if s == 0:
                dct['subject'].append(kw)
            elif s == 1:
                dct['predicate'].append(kw)
            elif s == 2:
                dct['object'].append(kw)
        
        if dct['subject'] == []: dct['subject'] = ['You']
        if dct['predicate'] == []: dct['predicate'] = [choice(unigrams)]
        if dct['object'] == []: dct['object'] = [choice(unigrams)]

        return dct

    
    def format_inp(self, tagged_text):

        kws = list(tagged_text.values())
        count = np.prod([len(w) for w in kws])
        new_dct = {}

        new_dct['subject'] = [w for w in kws[0] for _ in range(int(count/len(kws[0])))] 
        new_dct['predicate'] = [x for item in tagged_text['predicate'] for x in repeat(item, len(tagged_text['object']))]
        new_dct['predicate'] *= int(count/len(new_dct['predicate']))
        new_dct['object'] = tagged_text['object']*int(count/len(tagged_text['object']))

        triples = zip(*new_dct.values())

        transit_str = ' | '.join([' | '.join(triple) for triple in triples])

        return transit_str


    def extract_data(self, user_input):

        kws = self.get_kws(user_input)

        if kws[0] == []: return None
        
        model_inputs = self.prepare_inputs(*kws)
        tagged_text = self.assign(kws[0], model_inputs)
        transit_str = self.format_inp(tagged_text)

        return transit_str

#### Output Model

In [None]:
class OutputModel():

    def __init__(self):
        super().__init__()

        weights_dir = 'gs://PATH_TO_BUCKET/output_model/saved_weights'

        self.optimizer = tf.keras.optimizers.Adam()
        self.model = TFT5ForConditionalGeneration.from_pretrained('t5-base')
        self.model.load_weights(weights_path)

        self.tokenizer = T5Tokenizer.from_pretrained('t5-base')
    
    def encode(self, transit_str):

        return self.tokenizer(transit_str, return_tensors='tf')['input_ids']

    
    def generate_text(self, transit_str):

        if not transit_str: return "I'm sorry. I don't understand what you're saying."

        input_ids = self.encode(transit_str)
        output_ids = self.model.generate(input_ids).numpy()[0]
        sentence = self.tokenizer.decode(output_ids, skip_special_tokens=True)

        return sentence

#### Greetings

In [None]:
def pick_webpage(option):

    if option == 'joke':
        webpage_list = (["https://www.countryliving.com/life/a27452412/best-dad-jokes/", 
                        "https://parade.com/968634/parade/jokes-for-kids/", "https://parade.com/1041830/marynliles/clean-jokes/"])
    elif option == 'quote':
        webpage_list = (["https://www.berries.com/blog/positive-quotes", 
                        "https://blog.hubspot.com/sales/18-motivational-quotes-to-start-your-day-list", "https://my.oberlo.com/blog/motivational-quotes", 
                        "https://www.shopify.my/blog/motivational-quotes", "https://wisdomquotes.com/quote-of-the-day/"])
    elif option == 'fun fact':
        webpage_list = ["https://www.scarymommy.com/interesting-facts/", "https://redtri.com/quirky-facts-and-trivia-for-kids/"]


    return random.choice(webpage_list)

In [None]:
def get_webpage(url):

    url = "http://webcache.googleusercontent.com/search?q=cache:" + url

    page = Request(url, headers={'User-Agent': 'Mozilla/5.0'})
    page = urlopen(page).read()
    
    return page

In [None]:
def get_jokes(html_doc, url):

    soup = BeautifulSoup(html_doc, 'html.parser')

    step = [x for x in soup.find_all('li') if not x.a]
    text_list = [x.get_text().replace('\"', '') for x in step]

    if text_list: return text_list
    
    data = json.loads(soup.find('script', type='application/ld+json').text)['articleBody']

    text_list = data.split('[NUM]')[1:-1]

    if text_list: return text_list

    print(url)


def get_quotes(html_doc, url): 

    soup = BeautifulSoup(html_doc, 'html.parser')

    quote = soup.find_all('p', attrs={'class': 'box-quote'}) # works for index 0 only
    name = soup.find_all('p', attrs={'class': 'box-name'})
    step = zip([x.get_text() for x in quote], [x.get_text() for x in name])
    text_list = ['.'.join(x) for x in step]

    if text_list: return text_list

    step = [x.get_text() for x in soup.find_all('p')] # works for index 1 only
    text_list = [x for x in step if x and x[0].isdigit()]

    if text_list: return text_list

    step = [x.get_text() for x in soup.find_all('li') if not x.a] # works for index 2 and 3
    text_list = [x for x in step if len(x) >= 40]

    if text_list: return text_list

    text_list = [x.get_text() for x in soup.find_all('blockquote')] # works for index 4 only 

    if text_list: return text_list

    print(url)


def get_facts(html_doc, url):

    soup = BeautifulSoup(html_doc, 'html.parser')

    try:
        step = soup.find('div', {'class': 'entry-content'}) # only works for index 0
        step2 = list(map(lambda x: re.findall(r'^\d+\.(.+)', x), [x.get_text() for x in step.find_all('p')]))
        text_list = [x[0].strip() for x in step2 if x]

        if text_list: return text_list

    except:
        step = [x.get_text() for x in soup.find_all('p')] # only works for index 1
        step2 = [re.findall(r'^\d+\.(.+)', x) for x in step]

        text_list = [x[0].strip() for x in step2 if x]

        if text_list: return text_list

        print(url)

In [None]:
def output_joke(jokes_list):

    joke = random.choice(jokes_list)

    if '?' in joke:
        question = joke[:joke.index('?')+1].strip()
        answer = joke[joke.index('?')+1:].strip()

        print(question)
        input()
        print(answer)
    
    else:
        print(joke.strip())


def output_quote(quotes_list):

    text = random.choice(quotes_list)

    person = re.findall(r'[\w\s]+', text)[-1]

    quote_step = re.sub(person, '', text)
    quote = re.sub(r'^[^(a-zA-Z)]*|[^(a-zA-Z.)]*$', '', quote_step)

    if person.lower() in ['unknown', 'anonymous', ' ']: 
        print('Someone once said {}'.format(quote))
    
    else:
        print('{} once said {}'.format(person, quote))

    print('What do you think?')


def output_fact(facts_list):

    openers = ['Did you know? ', 'Apparently ', 'Fun fact! ']

    opener = random.choice(openers)

    fact = random.choice(facts_list)

    print(opener + fact)

In [None]:
def greet_by_joke():

    url = pick_webpage('joke')
    page = get_webpage(url)
    jokes_list = get_jokes(page, url)
    output_joke(jokes_list)


def greet_by_quote():

    url = pick_webpage('quote')
    page = get_webpage(url)
    quotes_list = get_quotes(page, url)
    output_quote(quotes_list)


def greet_by_fact():

    url = pick_webpage('fun fact')
    page = get_webpage(url)
    facts_list = get_facts(page, url)
    output_fact(facts_list)

In [None]:
def greetings():

    basic_greeting = ['Hi!', 'Hey!', 'Hello!'] 

    input()
    print(random.choice(basic_greeting))

    option = random.randint(0,2)

    if option == 0:
        greet_by_joke()
    elif option == 1:
        greet_by_quote()
    elif option == 2:
        greet_by_fact() 
    
    sid = SentimentIntensityAnalyzer()
    sentiment = sid.polarity_scores(input())['compound']

    if sentiment >= 0.05:
        print('\U0001F604')
    elif sentiment > -0.05 and sentiment < 0.05:
        print('\U0001F610')
    elif sentiment <= -0.05:
        print('\U0001F61E')

#### Main Program

In [None]:
input_model = InputModel()
output_model = OutputModel()

clear_output()

In [None]:
print('To end the conversation at any point, type "bye".')
print('To start the conversation, please greet the bot!')

greetings()

while True:
    
    user_input = str(input())

    if user_input.lower() == 'bye':
        print('Bye!')
        break
    
    transit_data = input_model.extract_data(user_input)

    output_text = output_model.generate_text(transit_data)

    print(output_text)