In [2]:
import tensorflow as tf
import tensorflow_hub as hub
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import seaborn as sns
import requests
import re
from bs4 import BeautifulSoup

In [6]:
class Pipeline():
    def __init__(self, medium_content_url, module_url = 'https://tfhub.dev/google/universal-sentence-encoder/2'):
        assert type(medium_content_url) == str
        assert type(module_url) == str
        self.medium_content_url = medium_content_url
        self.module_url = module_url
        self.__prepare_text_from_html(self.medium_content_url)
        self.__loading_module(self.module_url)
        self.similarity_tensor = None
    
    # function for downloading content from the web
    def __content_downloader(self, url):
        request = requests.get(url)
        soup = BeautifulSoup(request.content)
        return str(soup.find("div", {"class": "postArticle-content js-postField js-notesSource js-trackedPost"}))
    
    # cleaning the content and return sentences it's not compelete and need to be modified
    def __cleaning_text(self, html_part):
        text = re.sub('<[A-Za-z\/][^>]*>', ' ', str(html_part))
        text = re.split(r'\s{2,}', text)[1:-1]
        text = [sentence.replace('\xa0', '') for sentence in text]
        text = [sentence.replace('\u200a—\u200a', '') for sentence in text]
        text = [sentence.replace('\xe2\x80\x99', '') for sentence in text]
        text = [sentence.replace('\xc2', '') for sentence in text]
        return text
    
    # preparing a text for execute caculations on it
    def __prepare_text_from_html(self, url):
        html = self.__content_downloader(url)
        self.text = self.__cleaning_text(html)
    
    # function for loading diffrenet module
    def __loading_module(self, module_url):
        # Import the Universal Sentence Encoder's TF Hub module
        self.__embed_object = hub.Module(module_url)
        
    # function for runinng embedding module on text
    def __run_embedding(self, embed_object, text):
        # Reduce logging output.
        tf.logging.set_verbosity(tf.logging.ERROR)

        with tf.Session() as session:
            session.run([tf.global_variables_initializer(), tf.tables_initializer()])
            message_embeddings = session.run(embed_object(text))

        return message_embeddings
    
    # function for calculating similarity between question and text
    def __calculating_similarity_tensor(self, question, text):
        question_tensor = tf.Variable(tf.convert_to_tensor(self.__run_embedding(self.__embed_object, question)))
        text_tensor = tf.Variable(tf.convert_to_tensor(self.__run_embedding(self.__embed_object, text)))
        multiply_tensor = tf.matmul(question_tensor, text_tensor, transpose_b = True)
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            answer = sess.run(multiply_tensor)
        return answer
          
    # function for find sentence in text that answer question that has been asked
    def __find_the_most_similar_sentence(self, similarity_tensor):
        self.similarity_score = round(np.max(similarity_tensor), 3)
        print('similarity score for the most similar sentence is ', self.similarity_score)
        return np.hstack([self.question, self.text[np.argmax(similarity_tensor)]]).reshape(-1, 1)
    
    # function for asking question from text
    def ask_question_from_text_to_see_answer(self, question):
        self.question = [question]
        self.__similarity_tensor = self.__calculating_similarity_tensor(self.question, self.text)
        self.most_similar_sentence = self.__find_the_most_similar_sentence(self.__similarity_tensor)
        return self.most_similar_sentence
    
    # function for printing information about similarity tensor and printing a stack of similar sentences from text to question
    def information_about_similar_sentences(self, threshold = 0.3):
        self.sorted_similarity_array = np.array([list(row) for row in sorted(zip(self.__similarity_tensor[0], self.text),
                                                reverse = True)])
        for row in range(1, len(self.sorted_similarity_array)):
            try:
                if self.sorted_similarity_array[row][0] == self.sorted_similarity_array[row - 1][0]:
                    self.sorted_similarity_array = np.delete(self.sorted_similarity_array, row, axis = 0)       
            except IndexError:
                pass
#         self.sorted_similarity_array = pd.DataFrame(self.sorted_similarity_array[1])
        sorted_similarity_tensor = np.split(self.sorted_similarity_array, 2, axis = 1)[0].flatten().astype('float')
        sentences = np.array([self.sorted_similarity_array[i] for i in np.where(sorted_similarity_tensor > threshold)[0]])
        sentences = np.insert(sentences, 0, values = np.array([None, self.question[0]]).reshape(1, 2), axis=0)
        presentation_dataframe = pd.DataFrame(sentences, columns = ['similarity score', 'sentence'])
        presentation_dataframe = presentation_dataframe[['sentence', 'similarity score']]
        self.presentation_dataframe = presentation_dataframe
        return self.presentation_dataframe
    
    # function for ploting results
    def plot_similarity(self, rotation = 90):
        sentences = self.presentation_dataframe.values[:, 0]
        scores = self.__calculating_similarity_tensor(sentences, sentences)
        sns.set(font_scale = 1.2)
        g = sns.heatmap(
                scores,
                xticklabels = sentences,
                yticklabels = sentences,
                vmin = 0,
                vmax = 1,
                cmap = "Blues")
        g.set_xticklabels(sentences, rotation=rotation)
        g.set_title("Semantic Textual Similarity")
      
    #   function for pretty printing
#     def print_phase(self, phase):
#         print(phase)
#         print('<===============================================================================================================>')
    
#     # function for comparing two different modules from tensorflow hub on same question and text
#     def calculate_different_model_accuracy(self,
#                                            another_module_url = 'https://tfhub.dev/google/universal-sentence-encoder-large/3',
#                                            threshold = 0.3):
#         similarity_tensor_2 = self.__calculating_similarity_tensor(another_module_url, self.question, self.text)
#         print_phase(self.find_the_most_similar_sentence(self.similarity_tensor))
#         print_phase(self.find_the_most_similar_sentence(similarity_tensor_2))
#         print_phase(self.information_about_similar_sentences(self.similarity_tensor, threshold = threshold))
#         print_phase(self.information_about_similar_sentences(similarity_tensor_2, threshold = threshold))
        
    # function for calculating jaccard similarity 
    def __get_Jaccard_similarity(self, question, sentence):
        if type(question) != str:
            question = question[0]
        if type(sentence) != str:
            sentence = sentence[0]
        question_splitted = set(question.split())
        sentence_splitted = set(sentence.split())
        intersection_question_sentence = question_splitted.intersection(sentence_splitted)
        return round(len(intersection_question_sentence) / (len(question_splitted) + len(sentence_splitted) - len(intersection_question_sentence)), 3)
      
    # function for returning filtering dissimilar sentences to question
    def __find_Jaccard_similarity(self, question, text):
        self.__jaccard_similarity_score = np.array([get_Jaccard_similarity(question, sentence) for sentence in text])
        result = np.array([list(row) for row in sorted(zip(self.__jaccard_similarity_score, text), reverse = True)])
        return result, jaccard_similarity_score.reshape(1, 180)
    
    # function for calculating similar scores base on two metrics 
    def final_result(self):
        self.__find_Jaccard_similarity(self.question, self.text)
        summation = self.similarity_tensor + self.jaccard_similarity_tensor
        self.final_result = np.array([list(row) for row in sorted(zip(summation[0], self.text), reverse = True)])
        for row in range(1, len(self.final_result)):
            try:
                if self.final_result[row][0] == self.final_result[row - 1][0]:
                    self.final_result = np.delete(self.final_result, row, axis = 0)       
            except IndexError:
                pass
        self.final_result = np.insert(self.final_result, 0, values = np.array([None, self.question[0]]).reshape(1, 2), axis=0)
        self.final_result = pd.DataFrame(self.final_result, columns = ['similarity score', 'sentence'])
        self.final_result = presentation_dataframe[['sentence', 'similarity score']]
        return self.final_result
    
    # Load all files from a directory in a DataFrame.
    def __load_directory_data(self, directory):
        data = {}
        data["sentence"] = []
        data["sentiment"] = []
        for file_path in os.listdir(directory):
            with tf.gfile.GFile(os.path.join(directory, file_path), "r") as f:
                data["sentence"].append(f.read())
                data["sentiment"].append(re.match("\d+_(\d+)\.txt", file_path).group(1))
        return pd.DataFrame.from_dict(data)

    # Merge positive and negative examples, add a polarity column and shuffle.
    def __load_dataset(self, directory):
        pos_df = self.__load_directory_data(os.path.join(directory, "pos"))
        neg_df = self.__load_directory_data(os.path.join(directory, "neg"))
        pos_df["polarity"] = 1
        neg_df["polarity"] = 0
        return pd.concat([pos_df, neg_df]).sample(frac=1).reset_index(drop=True)

    # Download and process the dataset files.
    def __download_and_load_datasets(self, force_download=False):
        dataset = tf.keras.utils.get_file(
            fname="aclImdb.tar.gz", 
            origin="http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz", 
            extract=True)

        train_df = self.__load_dataset(os.path.join(os.path.dirname(dataset), 
                                 "aclImdb", "train"))
        test_df = self.__load_dataset(os.path.join(os.path.dirname(dataset), 
                                "aclImdb", "test"))

        return train_df, test_df
    
    def __input_functions(self):
        self.__train_df, self.__test_df = self.__download_and_load_datasets()
        self.__train_input_function = tf.estimator.inputs.pandas_input_fn(self.__train_df, self.__train_df["polarity"],
                                                                          num_epochs=None, shuffle=True)
        self.__predict_train_input_function = tf.estimator.inputs.pandas_input_fn(self.__train_df, self.__train_df["polarity"],
                                                                                  shuffle=False)
        self.__predict_test_input_function = tf.estimator.inputs.pandas_input_fn(self.__test_df, self.__test_df["polarity"],
                                                                                 shuffle=False)
        
    # function for training a DNN classifier base on tensorflow hub text modules
    def __train_and_evaluate_with_module(learning_rate = 0.003, steps = 1000, trainable = False):
        self.__embedded_text_feature_column = hub.text_embedding_column(key = 'sentence', module_spec = self.module_url,
                                                                 trainable = trainable)
        
        self.__input_functions()

        estimator = tf.estimator.DNNClassifier(
            hidden_units = [500, 100],
            feature_columns = [self.__embedded_text_feature_column],
            n_classes = 2,
            optimizer = tf.train.AdagradOptimizer(learning_rate = learning_rate))

        estimator.train(input_fn = self.__train_input_function, steps = steps)

        self.__train_eval_result = estimator.evaluate(input_fn = self.__predict_train_input_function)
        self.__test_eval_result = estimator.evaluate(input_fn = self.__predict_test_input_function)

        self.__training_set_accuracy = self.__train_eval_result['accuracy']
        self.__test_set_accuracy = self.__test_eval_result['accuracy']

        return {
          "Training accuracy": self.__training_set_accuracy,
          "Test accuracy": self.__test_set_accuracy
      }
    
    # function for comparing different state of module
    def comparing_module_with_trainable_option(selflearning_rate, steps):
        trainable_off = self.__train_and_evaluate_with_module(learning_rate = learning_rate, steps = steps, trainable = False)
        trainable_on = self.__train_and_evaluate_with_module(learning_rate = learning_rate, steps = steps, trainable = True)
        return {
            'train_off': trainable_off,
            'train_on': trainable_on
        }

In [None]:
class Pipeline():
    def __init__(self, medium_content_url, module_url = 'https://tfhub.dev/google/universal-sentence-encoder/2'):
        assert type(medium_content_url) == str
        assert type(module_url) == str
        self.medium_content_url = medium_content_url
        self.module_url = module_url
        self.__prepare_text_from_html(self.medium_content_url)
        self.__loading_module(self.module_url)
        self.__similarity_tensor = None
    
    # function for downloading content from the web
    def __content_downloader(self, url):
        request = requests.get(url)
        soup = BeautifulSoup(request.content)
        return str(soup.find("div", {"class": "postArticle-content js-postField js-notesSource js-trackedPost"}))
    
    # cleaning the content and return sentences it's not compelete and need to be modified
    def __cleaning_text(self, html_part):
        text = re.sub('<[A-Za-z\/][^>]*>', ' ', str(html_part))
        text = re.split(r'\s{2,}', text)[1:-1]
        text = [sentence.replace('\xa0', '') for sentence in text]
        text = [sentence.replace('\u200a—\u200a', '') for sentence in text]
        text = [sentence.replace('\xe2\x80\x99', '') for sentence in text]
        text = [sentence.replace('\xc2', '') for sentence in text]
        return text
    
    # preparing a text for execute caculations on it
    def __prepare_text_from_html(self, url):
        html = self.__content_downloader(url)
        self.text = self.__cleaning_text(html)
    
    # function for loading diffrenet module
    def __loading_module(self, module_url):
        # Import the Universal Sentence Encoder's TF Hub module
        self.__embed_object = hub.Module(module_url)
        
    # function for runinng embedding module on text
    def __run_embedding(self, embed_object, text):
        # Reduce logging output.
        tf.logging.set_verbosity(tf.logging.ERROR)

        with tf.Session() as session:
            session.run([tf.global_variables_initializer(), tf.tables_initializer()])
            message_embeddings = session.run(embed_object(text))

        return message_embeddings
    
    # function for calculating similarity between question and text
    def __calculating_similarity_tensor(self, question, text):
        question_tensor = tf.Variable(tf.convert_to_tensor(self.__run_embedding(self.__embed_object, question)))
        text_tensor = tf.Variable(tf.convert_to_tensor(self.__run_embedding(self.__embed_object, text)))
        multiply_tensor = tf.matmul(question_tensor, text_tensor, transpose_b = True)
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            answer = sess.run(multiply_tensor)
        return answer
          
    # function for find sentence in text that answer question that has been asked
    def __find_the_most_similar_sentence(self, similarity_tensor):
        self.similarity_score = round(np.max(similarity_tensor), 3)
        print('similarity score for the most similar sentence is ', self.similarity_score)
        return np.hstack([self.question, self.text[np.argmax(similarity_tensor)]]).reshape(-1, 1)
    
    # function for asking question from text
    def ask_question_from_text_to_see_answer(self, question):
        self.question = [question]
        self.__similarity_tensor = self.__calculating_similarity_tensor(self.question, self.text)
        self.most_similar_sentence = self.__find_the_most_similar_sentence(self.__similarity_tensor)
        return self.most_similar_sentence
    
    # function for printing information about similarity tensor and printing a stack of similar sentences from text to question
    def information_about_similar_sentences(self, threshold = 0.3):
        self.sorted_similarity_array = np.array([list(row) for row in sorted(zip(self.__similarity_tensor[0], self.text),
                                                reverse = True)])
        for row in range(1, len(self.sorted_similarity_array)):
            try:
                if self.sorted_similarity_array[row][0] == self.sorted_similarity_array[row - 1][0]:
                    self.sorted_similarity_array = np.delete(self.sorted_similarity_array, row, axis = 0)       
            except IndexError:
                pass
#         self.sorted_similarity_array = pd.DataFrame(self.sorted_similarity_array[1])
        sorted_similarity_tensor = np.split(self.sorted_similarity_array, 2, axis = 1)[0].flatten().astype('float')
        sentences = np.array([self.sorted_similarity_array[i] for i in np.where(sorted_similarity_tensor > threshold)[0]])
        sentences = np.insert(sentences, 0, values = np.array([None, self.question[0]]).reshape(1, 2), axis=0)
        presentation_dataframe = pd.DataFrame(sentences, columns = ['similarity score', 'sentence'])
        presentation_dataframe = presentation_dataframe[['sentence', 'similarity score']]
        self.presentation_dataframe = presentation_dataframe
        return self.presentation_dataframe
    
    # function for ploting results
    def plot_similarity(self, rotation = 90):
        sentences = self.presentation_dataframe.values[:, 0]
        scores = self.__calculating_similarity_tensor(sentences, sentences)
        sns.set(font_scale = 1.2)
        g = sns.heatmap(
                scores,
                xticklabels = sentences,
                yticklabels = sentences,
                vmin = 0,
                vmax = 1,
                cmap = "Blues")
        g.set_xticklabels(sentences, rotation=rotation)
        g.set_title("Semantic Textual Similarity")
        
    # function for calculating jaccard similarity 
    def __get_Jaccard_similarity(self, question, sentence):
        if type(question) != str:
            question = question[0]
        if type(sentence) != str:
            sentence = sentence[0]
        question_splitted = set(question.split())
        sentence_splitted = set(sentence.split())
        intersection_question_sentence = question_splitted.intersection(sentence_splitted)
        return round(len(intersection_question_sentence) / (len(question_splitted) + len(sentence_splitted) - len(intersection_question_sentence)), 3)
      
    # function for returning filtering dissimilar sentences to question
    def find_Jaccard_similarity(self, question, text):
        self.__jaccard_similarity_score = np.array([self.__get_Jaccard_similarity(question, sentence) for sentence in text])
        result = np.array([list(row) for row in sorted(zip(self.__jaccard_similarity_score, text), reverse = True)])
        self.__jaccard_similarity_score =  self.__jaccard_similarity_score.reshape(1, 180)
        return result
    
    # function for calculating similar scores base on two metrics 
    def final_result(self):
        self.find_Jaccard_similarity(self.question, self.text)
        summation = self.__similarity_tensor + self.__jaccard_similarity_score
        self.final_result = np.array([list(row) for row in sorted(zip(summation[0], self.text), reverse = True)])
        for row in range(1, len(self.final_result)):
            try:
                if self.final_result[row][0] == self.final_result[row - 1][0]:
                    self.final_result = np.delete(self.final_result, row, axis = 0)       
            except IndexError:
                pass
        self.final_result = np.insert(self.final_result, 0, values = np.array([None, self.question[0]]).reshape(1, 2), axis=0)
        self.final_result = pd.DataFrame(self.final_result, columns = ['similarity score', 'sentence'])
        self.final_result = self.final_result[['sentence', 'similarity score']]
        return self.final_result
      
      # Load all files from a directory in a DataFrame.
    def __load_directory_data(self, directory):
        data = {}
        data["sentence"] = []
        data["sentiment"] = []
        for file_path in os.listdir(directory):
            with tf.gfile.GFile(os.path.join(directory, file_path), "r") as f:
                data["sentence"].append(f.read())
                data["sentiment"].append(re.match("\d+_(\d+)\.txt", file_path).group(1))
        return pd.DataFrame.from_dict(data)

    # Merge positive and negative examples, add a polarity column and shuffle.
    def __load_dataset(self, directory):
        pos_df = self.__load_directory_data(os.path.join(directory, "pos"))
        neg_df = self.__load_directory_data(os.path.join(directory, "neg"))
        pos_df["polarity"] = 1
        neg_df["polarity"] = 0
        return pd.concat([pos_df, neg_df]).sample(frac=1).reset_index(drop=True)

    # Download and process the dataset files.
    def __download_and_load_datasets(self, force_download=False):
        dataset = tf.keras.utils.get_file(
            fname="aclImdb.tar.gz", 
            origin="http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz", 
            extract=True)

        train_df = self.__load_dataset(os.path.join(os.path.dirname(dataset), 
                                 "aclImdb", "train"))
        test_df = self.__load_dataset(os.path.join(os.path.dirname(dataset), 
                                "aclImdb", "test"))

        return train_df, test_df
    
    def __input_functions(self):
        self.__train_df, self.__test_df = self.__download_and_load_datasets()
        self.__train_input_function = tf.estimator.inputs.pandas_input_fn(self.__train_df, self.__train_df["polarity"],
                                                                          num_epochs=None, shuffle=True)
        self.__predict_train_input_function = tf.estimator.inputs.pandas_input_fn(self.__train_df, self.__train_df["polarity"],
                                                                                  shuffle=False)
        self.__predict_test_input_function = tf.estimator.inputs.pandas_input_fn(self.__test_df, self.__test_df["polarity"],
                                                                                 shuffle=False)
        
    # function for training a DNN classifier base on tensorflow hub text modules
    def __train_and_evaluate_with_module(learning_rate = 0.003, steps = 1000, trainable = False):
        self.__embedded_text_feature_column = hub.text_embedding_column(key = 'sentence', module_spec = self.module_url,
                                                                 trainable = trainable)
        
        self.__input_functions()

        estimator = tf.estimator.DNNClassifier(
            hidden_units = [500, 100],
            feature_columns = [self.__embedded_text_feature_column],
            n_classes = 2,
            optimizer = tf.train.AdagradOptimizer(learning_rate = learning_rate))

        estimator.train(input_fn = self.__train_input_function, steps = steps)

        self.__train_eval_result = estimator.evaluate(input_fn = self.__predict_train_input_function)
        self.__test_eval_result = estimator.evaluate(input_fn = self.__predict_test_input_function)

        self.__training_set_accuracy = self.__train_eval_result['accuracy']
        self.__test_set_accuracy = self.__test_eval_result['accuracy']

        return {
          "Training accuracy": self.__training_set_accuracy,
          "Test accuracy": self.__test_set_accuracy
      }
    
    # function for comparing different state of module
    def comparing_module_with_trainable_option(selflearning_rate, steps):
        trainable_off = self.__train_and_evaluate_with_module(learning_rate = learning_rate, steps = steps, trainable = False)
        trainable_on = self.__train_and_evaluate_with_module(learning_rate = learning_rate, steps = steps, trainable = True)
        return {
            'train_off': trainable_off,
            'train_on': trainable_on
        }

In [None]:
example = Pipeline('https://blog.insightdatascience.com/reinforcement-learning-from-scratch-819b65f074d8', 'module_url')