<a href="https://colab.research.google.com/github/ZhuRuoyu/calendar-chatbot/blob/main/customized_model/InputRecognition_TimeFormat.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [109]:
#library
!pip install sentence_transformers

from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
import pandas as pd
import spacy
from spacy import displacy
from spacy.tokens import DocBin
import json
from datetime import datetime, timedelta, date, timezone
from tqdm import tqdm
import re

import pytz #timezone

!pip install datefinder
import datefinder


!pip install word2number
from word2number import w2n

from dateutil import parser # date



# load pre-trained model

In [21]:
#### need load the pre-train model outside the class and store somewhere!!
intents = [
    "create a new event", #0
    "delete a certain event",#1
    "retrieve event detail of a certain date", #2 -> TAG: retrieving event (intent) today (entity arguments)
    "get free time information during certain time period", #3
    "change the location or summary of an event",#4
    "reschedule the time of an event" #5
    ]

bert_model = SentenceTransformer('bert-base-nli-mean-tokens')
embedded_intents = bert_model.encode(intents)  # embedded_intents.shape  = (6, 768)

ner_output = spacy.load('drive/MyDrive/capstone_ner_output' + '/model-best')



# class time formatting

In [118]:
class TimeFormat:
    def __init__(self, arg_dict, timezone_want = 'America/Toronto'):
        self.arg_dict = arg_dict
        self.timezone = timezone_want
        self.fun_pool = [self.date_format,
                         self.start_time_format,
                         self.end_time_format,
                         ]

    def __extract_duration_components(self, duration_string):

      ''' the hours and mins abbr can't be recognized'''
    # Regular expression to find numbers and units in the input string
      pattern = r'(\d+|one|two|three|four|five|six|seven|eight|nine|ten)\s*(day|hour|minute)s?'
      matches = re.findall(pattern, duration_string)

      time_components = {
          'day': 0,
          'hour': 0,
          'minute': 0
      }

      for number, unit in matches:
          number = w2n.word_to_num(number) if number.isalpha() else int(number)
          unit = unit.lower()
          time_components[unit] = number

      return [time_components['day'], time_components['hour'], time_components['minute']]


    def __get_current_date_in_timezone(self, timezone_name):
    # Get the current date and time in UTC
      utc_now = datetime.utcnow()

    # Set the UTC timezone
      utc_timezone = pytz.timezone('UTC')

    # Convert the UTC time to the desired timezone
      desired_timezone = pytz.timezone(timezone_name)
      desired_date = utc_timezone.localize(utc_now).astimezone(desired_timezone).date()

      return desired_date


    def date_format(self):
      '''... need to fixed typo issue'''
      if 'DATE' in self.arg_dict.keys():

        date_str = self.arg_dict['DATE']


        # if no mis-spelling: parse it
        # if no date extracted, raise error
        date_get = parser.parse(date_str)
        self.arg_dict['DATE'] = date_get.strftime("%Y-%m-%d")

      else:
        #print('no "DATE" key, get todays date')
        date_get = self.__get_current_date_in_timezone(self.timezone)

      self.arg_dict['DATE'] = date_get.strftime("%Y-%m-%d")
      return self.arg_dict


    def start_time_format(self):

        if 'START_TIME' in self.arg_dict.keys():
          start_time_str = self.arg_dict['START_TIME']

          matches_start = list(datefinder.find_dates(start_time_str))

          while len(matches_start) == 0:
            print('Your start time is {}, need another format'.format(start_time_str))
            start_time_str = input("please enter the start time in the suggested format HH:MM")
            matches_start = list(datefinder.find_dates(start_time_str))


          # after the while loop, the len(list(matches_start))  must != 0:
          for match_start_first in matches_start: # select first time extracted
            # self.arg_dict['START_TIME'] = 'hello'


            #print('match time fist',match_start_first)
            start_time = match_start_first.strftime("%H:%M:%S")

            self.arg_dict['START_TIME'] = start_time
            break

          return self.arg_dict

    def end_time_format(self):
      #end_time = '2023-07-29 01:00:00'
      if 'END_TIME' in self.arg_dict.keys(): #end time given

        end_time_str = self.arg_dict['END_TIME']
        matches_end   = list(datefinder.find_dates(end_time_str))


      else: #no end time given

        matches_end = []


      while (len(matches_end) == 0):
          #print('no end time matches, need to use duration to calculate the end time automatically')
          #print('variable need: "START_TIME", "DURATION", check availablity ')

          if 'START_TIME' in self.arg_dict.keys(): # by calculating

            start_time_str = self.arg_dict['START_TIME']


            # set duration
            if 'DURATION' in self.arg_dict.keys():
              duration_str = self.arg_dict['DURATION']
              duration = self.__extract_duration_components(duration_str) #return a list [day, hour, minute]

            else: #set the default duration = 0 day 1 hour 0 minutes
              duration = [0,1,0]

            # convert string-start-time to 'datetime type' with current date (strptime not working)
            start_temp = list(datefinder.find_dates(start_time_str))
            matches_end = start_temp #no meaning, just break the while loop
            for start_time in start_temp:
              break

            #end_time is already in the right format
            end_time = start_time + timedelta(days=duration[0], hours=duration[1], minutes=duration[2])
            end_time = end_time.strftime("%H:%M:%S")

            #print('end time is calcuate by duration', end_time)

            self.arg_dict['END_TIME'] = end_time
            self.arg_dict['DURATION'] = duration

            return self.arg_dict #already return, dic return before else

          else: #no start time detected, need by manual input
            print('variable need: "START_TIME", "DURATION", are  unavailable, need manual input')
            end_time_str = input("please enter the end time in the suggested format HH:MM")
            matches_end = list(datefinder.find_dates(end_time_str))
            matches_end_else = matches_end

        # after while loop, make sure there is some time variable in the matches_end
        ## assign end_time from manual input
      for end_time_else in matches_end:#_else: # select first time extracted
            end_time = end_time_else.strftime("%H:%M:%S")
            break


      self.arg_dict['END_TIME'] = end_time

      return self.arg_dict


    def format_all(self):
      for it in self.fun_pool:

        self.arg_dict = it()

      return self.arg_dict




	# if 'DATE' in arg_dict.keys:
	# 	date_str = arg_dict['DATE']
  #   	match_date = list(datefinder.find_dates(date_str)) #edge case: date = tomorrow

	# 	if len(match_date) != 0:
  #     		date = match_date[0]
  #     	else:
  #     		print(' no matched date found')
  #     		date = input("please enter the date in the format YYYY-MM-DD") # 'tomorrow' not match any, ask user again
  #     		date = date.strftime("%Y-%m-%dT")

  #     	arg_dict['DATE'] = date

  #   return arg_dict
  #       return something_else



# input recog

In [135]:
class InputRecognition:
  def __init__(self, user_input_sentence, embedded_intents_given):

    self.embedded_intents_given = embedded_intents_given
    self.user_input_sentence = user_input_sentence
    self.function_name = ''
    self.arguments_dict = {}
    self.upper_required_arguments_list = []
    self.idx = 99


  def find_intent_index(self):
    max = 0.1

    while max < 0.4:
      #print(max)
      user_input_embed = bert_model.encode(self.user_input_sentence)
      #vector = user_input_embed

      cosine_score = cosine_similarity([user_input_embed], self.embedded_intents_given)
      max = np.max(cosine_score)
      #print(max, 'max score')
      #print(cosine_score, 'cosine score')
      if max > 0.4:
        break

      additional_input = input('can not find matching intents, please be more specific')
      self.user_input_sentence = self.user_input_sentence + additional_input #add two sentences together

    self.idx = np.argmax(cosine_score, axis = 1)[0]
    return self.idx


  def match_intent_to_function(self): #return function name
      intents = {
          0: 'create_event',
          1: 'delete_event',
          2: 'get_calendar_events',
          3: 'get_free_busy_info',
          4: 'update_event',
          5: 'reschedule_event',
          999: 'NA - no intent found - need more information'
      }
      self.function_name = intents.get(self.idx)


      return self.function_name



  def get_arguments_dict(self): #, all_tag_list_name, ner_dict_raw):
    ner_raw = ner_output(self.user_input_sentence) #prep step #tagged NER # doc_1
    '''tagged sentence with customized NER, and extract nessary arguments (based on the function called) frrom tagging '''

    for ent in ner_raw.ents:
    #if ent.label_ in (arguments_list):
      if ent.label_ not in self.arguments_dict.keys(): # will keep the first key
        self.arguments_dict.update({ent.label_: ent.text})  # if delete 'if' statement, will overwrite and keep the last

    #print('after updating the empty dict, it has ',self.arguments_dict)

    #arguments_dict['DATE'] is 'Landscapes LLC'
    return self.arguments_dict


  def required_arguments(self):
      required_args_mapping = {
          'create_event': ['summary', 'start_time', 'end_time'],
          'delete_event': ['event_id'],
          'get_calendar_events': ['date'],
          'get_free_busy_info': ['start_time', 'end_time'],
          'update_event': ['event_id'],
          'reschedule_event': ['event_id', 'start_time', 'end_time']
      }

      required_arguments_list = required_args_mapping.get(self.function_name, [])
      self.upper_required_arguments_list = [x.upper() for x in required_arguments_list]
      #print(self.upper_required_arguments_list)

      return self.upper_required_arguments_list



  def run_all(self):
    self.idx = self.find_intent_index()
    self.function_name = self.match_intent_to_function()
    self.arguments_dict = self.get_arguments_dict()
    self.upper_required_arguments_list = self.required_arguments()

    return self.function_name, self.arguments_dict, self.upper_required_arguments_list


  def check_info_requirement(self):#function_name, arguments_list_need, arg_dict_true):
    response_arguments = ''
    missing_key = []
    flag_add_new_info = False # GLOBAL

    #check if find everything required: (only location is optional)
    #response_function = 'You want to {}. '.format(self.function_name)

    for key in self.upper_required_arguments_list:

      if (key in self.arguments_dict.keys()) == False:
        missing_key.append(key)
        #value_in_key = self.arguments_dict[key] #maybe empty
        #response_arguments += 'the {} = {} '.format(key, value_in_key)

      # else: # needed key not in true keys
      # #if is NOT optional key (required key -> based on function)
      #   if key in required_arguments(self.function_name): # call the function, return a list ## need modified
      #     missing_key.append(key)

    if len(missing_key) == 0: #all required
      #flag_add_new_info = False
      #response = response_function + response_arguments + " All requirments are satisfied."
      print('All required arguments are satisfied.')
    else:
      #flag_add_new_info = True
      response_need_more_info = ' We need more information on {}, please add them.'.format(missing_key)
      print(response_need_more_info)
      #response = response_function + response_need_more_info


    return missing_key # response, flag_add_new_info

# TEST IT

In [140]:
user1_raw = InputRecognition('create a shopping event at 9am, at Metro store', embedded_intents)

# if the input sentence is too long, it can not recgonize the intent
func, arg, req_list = user1_raw.run_all()

print('the function is {}. It requires these keywords {}.'.format(func, req_list))
print('From the input sentence, we can get these raw keywords',arg)
print('\n')
user1_cleaned = TimeFormat(arg, 'America/Toronto')
arguments_dict_cleaned = user1_cleaned.format_all()
user1_raw.check_info_requirement() # check if everything needed is here

print('the final output is ')
arguments_dict_cleaned

the function is create_event. It requires these keywords ['SUMMARY', 'START_TIME', 'END_TIME'].
From the input sentence, we can get these raw keywords {'START_TIME': '9am', 'LOCATION': 'Metro store'}


 We need more information on ['SUMMARY'], please add them.
the final output is 


{'START_TIME': '09:00:00',
 'LOCATION': 'Metro store',
 'DATE': '2023-07-30',
 'END_TIME': '10:00:00',
 'DURATION': [0, 1, 0]}

In [141]:
input2 = "reschedule todays 10pm event."

user2_raw = InputRecognition(input2, embedded_intents)

# if the input sentence is too long, it can not recgonize the intent
func, arg, req_list = user2_raw.run_all()

print('the function is {}. It requires these keywords {}.'.format(func, req_list))
print('From the input sentence, we can get these raw keywords',arg)
print('\n')
user_cleaned = TimeFormat(arg)
arguments_dict_cleaned = user_cleaned.format_all()

user2_raw.check_info_requirement() # check if everything needed is here

print('the final output is ')
arguments_dict_cleaned

the function is reschedule_event. It requires these keywords ['EVENT_ID', 'START_TIME', 'END_TIME'].
From the input sentence, we can get these raw keywords {'START_TIME': '10pm'}


 We need more information on ['EVENT_ID'], please add them.
the final output is 


{'START_TIME': '22:00:00',
 'DATE': '2023-07-30',
 'END_TIME': '23:00:00',
 'DURATION': [0, 1, 0]}