In [1]:
from google.colab import drive

In [None]:
drive.mount('/content/drive',force_remount=True)

In [None]:
!pip install tensorflow

In [None]:
!pip install transformers==4.31.0

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import transformers

In [None]:
max_length = 128  # Maximum length of input sentence to the model.
batch_size = 32
epochs = 2

In [None]:
labels = ["contradiction", "entailment", "neutral"]

In [None]:
class BertSemanticDataGenerator(tf.keras.utils.Sequence):
    """Generates batches of data.

    Args:
        sentence_pairs: Array of premise and hypothesis input sentences.
        labels: Array of labels.
        batch_size: Integer batch size.
        shuffle: boolean, whether to shuffle the data.
        include_targets: boolean, whether to incude the labels.

    Returns:
        Tuples `([input_ids, attention_mask, `token_type_ids], labels)`
        (or just `[input_ids, attention_mask, `token_type_ids]`
         if `include_targets=False`)
    """

    def __init__(
        self,
        sentence_pairs,
        labels,
        batch_size=batch_size,
        shuffle=True,
        include_targets=True,
    ):
        self.sentence_pairs = sentence_pairs
        self.labels = labels
        self.shuffle = shuffle
        self.batch_size = batch_size
        self.include_targets = include_targets
        # Load our BERT Tokenizer to encode the text.
        # We will use base-base-uncased pretrained model.
        self.tokenizer = transformers.BertTokenizer.from_pretrained(
            "bert-base-uncased", do_lower_case=True
        )
        self.indexes = np.arange(len(self.sentence_pairs))
        self.on_epoch_end()

    def __len__(self):
        # Denotes the number of batches per epoch.
        return len(self.sentence_pairs) // self.batch_size

    def __getitem__(self, idx):
        # Retrieves the batch of index.
        indexes = self.indexes[idx * self.batch_size : (idx + 1) * self.batch_size]
        sentence_pairs = self.sentence_pairs[indexes]

        # With BERT tokenizer's batch_encode_plus batch of both the sentences are
        # encoded together and separated by [SEP] token.
        encoded = self.tokenizer.batch_encode_plus(
            sentence_pairs.tolist(),
            add_special_tokens=True,
            max_length=max_length,
            return_attention_mask=True,
            return_token_type_ids=True,
            pad_to_max_length=True,
            return_tensors="tf",
        )

        # Convert batch of encoded features to numpy array.
        input_ids = np.array(encoded["input_ids"], dtype="int32")
        attention_masks = np.array(encoded["attention_mask"], dtype="int32")
        token_type_ids = np.array(encoded["token_type_ids"], dtype="int32")

        # Set to true if data generator is used for training/validation.
        if self.include_targets:
            labels = np.array(self.labels[indexes], dtype="int32")
            return [input_ids, attention_masks, token_type_ids], labels
        else:
            return [input_ids, attention_masks, token_type_ids]

    def on_epoch_end(self):
        # Shuffle indexes after each epoch if shuffle is set to True.
        if self.shuffle:
            np.random.RandomState(42).shuffle(self.indexes)



In [None]:
strategy = tf.distribute.MirroredStrategy()

with strategy.scope():
    # Encoded token ids from BERT tokenizer.
    input_ids = tf.keras.layers.Input(
        shape=(max_length,), dtype=tf.int32, name="input_ids"
    )
    # Attention masks indicates to the model which tokens should be attended to.
    attention_masks = tf.keras.layers.Input(
        shape=(max_length,), dtype=tf.int32, name="attention_masks"
    )
    # Token type ids are binary masks identifying different sequences in the model.
    token_type_ids = tf.keras.layers.Input(
        shape=(max_length,), dtype=tf.int32, name="token_type_ids"
    )
    # Loading pretrained BERT model.
    bert_model = transformers.TFBertModel.from_pretrained("bert-base-uncased")
    # Freeze the BERT model to reuse the pretrained features without modifying them.
    bert_model.trainable = False

    bert_output = bert_model.bert(
        input_ids, attention_mask=attention_masks, token_type_ids=token_type_ids
    )
    sequence_output = bert_output.last_hidden_state
    pooled_output = bert_output.pooler_output
    # Add trainable layers on top of frozen layers to adapt the pretrained features on the new data.
    bi_lstm = tf.keras.layers.Bidirectional(
        tf.keras.layers.LSTM(64, return_sequences=True)
    )(sequence_output)
    # Applying hybrid pooling approach to bi_lstm sequence output.
    avg_pool = tf.keras.layers.GlobalAveragePooling1D()(bi_lstm)
    max_pool = tf.keras.layers.GlobalMaxPooling1D()(bi_lstm)
    concat = tf.keras.layers.concatenate([avg_pool, max_pool])
    dropout = tf.keras.layers.Dropout(0.3)(concat)
    output = tf.keras.layers.Dense(3, activation="softmax")(dropout)
    model = tf.keras.models.Model(
        inputs=[input_ids, attention_masks, token_type_ids], outputs=output
    )

    model.compile(
        optimizer=tf.keras.optimizers.Adam(),
        loss="categorical_crossentropy",
        metrics=["acc"],
    )


print(f"Strategy: {strategy}")
model.summary()


In [None]:
from tensorflow import keras

loaded_model = tf.keras.models.load_model('/content/drive/MyDrive/Bert/loaded_model.h5')

def check_similarity(sentence1, sentence2):
    sentence_pairs = np.array([[str(sentence1), str(sentence2)]])
    test_data = BertSemanticDataGenerator(
        sentence_pairs, labels=None, batch_size=1, shuffle=False, include_targets=False,
    )

    proba = loaded_model.predict(test_data[0])[0]
    idx = np.argmax(proba)
    proba = f"{proba[idx]: .2f}%"
    pred = labels[idx]
    return pred, proba


In [None]:
import re
import warnings
import pandas as pd

def calculate_similarity_scores(evaluator_content, student_content):
    def create_dictionary(content):
        # Remove numeric indices before "Question" and "Answer" words, preserving the colon
        content_without_indices = re.sub(r'(\b\d+\.\s*Question:)', 'Question:', content)
        content_without_indices = re.sub(r'(\b\d+\.\s*Answer:)', 'Answer:', content_without_indices)

        # Tokenize and print each word with punctuation
        words = re.findall(r'\b\w+\b|[.,;!?:]', content_without_indices)

        sentence = ""
        my_dic = {}
        n = 0

        for word in words:
            if word == "Answer" and words[n + 1] == ":":
                key = sentence.strip()
                sentence = ""
            if word == "Question" and n > 4 and words[n + 1] == ":":
                my_dic[key] = sentence.strip()
                sentence = ""
            sentence += word
            sentence += " "
            n += 1

        # Check for the last question-answer pair
        if sentence.strip() and key:
            my_dic[key] = sentence.strip()

        return my_dic

    eval_dic = create_dictionary(evaluator_content)
    std_dic = create_dictionary(student_content)

    data = {'Evaluator': [], 'Student': [], 'per_mark': [], 'Similarity': []}
    total_entailment = 0
    total_neutral = 0
    total_contradiction = 0

    for fn in eval_dic.keys():
        sentence_1 = eval_dic[fn]
        sentence_2 = std_dic[fn]
        result = check_similarity(sentence_1, sentence_2)

        # Extracting percentage value from the tuple
        percentage = float(result[1].strip('%'))

        data['Student'].append(sentence_2)
        data['Evaluator'].append(sentence_1)
        data['per_mark'].append(percentage)
        data['Similarity'].append(result[0])

        # Update total scores based on result type
        if result[0] == 'entailment':
            total_entailment += percentage
        elif result[0] == 'neutral':
            total_neutral += percentage / 3
        elif result[0] == 'contradiction':
            total_contradiction += percentage

    # Calculate average scores
    num_entailment = len([x for x in data['Similarity'] if x == 'entailment'])
    num_neutral = len([x for x in data['Similarity'] if x == 'neutral'])
    num_contradiction = len([x for x in data['Similarity'] if x == 'contradiction'])

    num = (num_entailment + num_neutral + num_contradiction)
    total_contradiction=0
    total_accuracy = (total_contradiction + total_entailment + total_neutral) / num

    # Add total accuracy to DataFrame
    data['Total_Accuracy'] = total_accuracy

    df = pd.DataFrame(data)
    df.to_csv('output.csv', index=False)
    return df


In [None]:

# # Example inference
# sentence1 = "Two women are observing something together."
# sentence2 = "Two women are standing with their eyes closed."
# result = check_similarity(sentence1, sentence2)
# print(result)

# sentence1 = " The: greenhouse effect is a natural process where specific gases in Earth's atmosphere trap and radiate heat. Solar radiation passes through the atmosphere, warming the Earth's surface. The surface emits infrared radiation, but greenhouse gases like carbon dioxide and methane absorb and re-emit this radiation. This action traps heat, maintaining a temperature suitable for life."
# sentence2 = " The: Carbon dioxide (CO2) is harmful to the environment mainly due to its contribution to the greenhouse effect. Excessive CO2, primarily from human activities like burning fossil fuels, intensifies the natural greenhouse effect, leading to global warming and climate change. This results in more frequent and severe weather events, disruptions to ecosystems, and rising sea levels."
# result = check_similarity(sentence1, sentence2)
# print(result)


In [None]:
# ----from google.colab.output import eval_js
#---- print(eval_js("google.colab.kernel.proxyPort(5000)"))

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from flask import Flask
from flask import request, redirect
from flask import render_template

In [None]:
# app = Flask(__name__,template_folder = '/content/drive/MyDrive/Bert/templates')

In [None]:
# from IPython.display import clear_output


# @app.route('/',methods=['GET','POST'])
# def start():
#     if request.method =='POST':
#         if 'file1' in request.files and 'file2' in request.files:
#             evaluator = request.files['file1']
#             student = request.files['file2']

#             evaluator_path = "/content/drive/MyDrive/Bert/history/evaluator.txt"
#             student_path = "/content/drive/MyDrive/Bert/history/student.txt"

#             evaluator.save(evaluator_path)
#             student.save(student_path)

#             with open(evaluator_path, 'rb') as f1:
#                     content1 = f1.read()
#                     # evaluator_content = f1.read()
#             evaluator = {'evaluator.txt': content1}

#             with open(student_path, 'rb') as f2:
#                     content2 = f2.read()
#                     # student_content = f2.read()
#             student = {'student.txt': content2}

#             print(evaluator)
#             print(student)

#             for ky in evaluator.keys():
#                   evaluator_content = evaluator[ky].decode('utf-8')
#             for ky in student.keys():
#                   student_content = student[ky].decode('utf-8')
#             clear_output()
#             result_df = calculate_similarity_scores(evaluator_content, student_content)
#             result_df.to_html('/content/drive/MyDrive/Bert/templates/output.html', index=False)
#             print(result_df)

#             # result_df.to_csv(file_path, index=False).save("/content/drive/MyDrive/Bert/history/output.csv")
#             # result_df.to_csv(file_path, index=False)
#             return render_template('output.html')

#         else:
#             return "<h3>Sorry we are unable to detect any files. Please Try Again</h3>"

#     return render_template('index.html')

In [None]:
# def output():
#   with open('/content/drive/MyDrive/Bert/history/evaluator.txt','rw') as f:
#     contents=f.read()
#     print(contents)

In [None]:
# !pip install --upgrade google-colab

In [None]:

# evaluator_path = "/content/drive/MyDrive/Bert/history/evaluator.txt"
# student_path = "/content/drive/MyDrive/Bert/history/student.txt"


# with open(evaluator_path, 'rb') as file:
#         content = file.read()

# evaluator = {'evaluator.txt': content}

# with open(student_path, 'rb') as file:
#         content = file.read()

# student = {'student.txt': content}


# for ky in evaluator.keys():
#       evaluator_content = evaluator[ky].decode('utf-8')
# for ky in student.keys():
#       student_content = student[ky].decode('utf-8')

# evaluator_content

# student_content

In [None]:
# from google.colab import files
# evaluator = files.upload()
# student1 = files.upload()
# student2 = files.upload()
# student3 = files.upload()
# student4 = files.upload()
# for ky in evaluator.keys():
#       evaluator_content = evaluator[ky].decode('utf-8')
# for ky in student1.keys():
#       student_content1 = student1[ky].decode('utf-8')
# for ky in student2.keys():
#       student_content2= student2[ky].decode('utf-8')
# for ky in student3.keys():
#       student_content3= student3[ky].decode('utf-8')
# for ky in student4.keys():
#       student_content4 = student4[ky].decode('utf-8')

In [None]:
# result_df = calculate_similarity_scores(evaluator_content, student_content1)
# result_df.to_csv('output.csv', index=False)

# result_df1 = calculate_similarity_scores(evaluator_content, student_content2)
# result_df1.to_csv('output.csv', index=False)

# result_df2 = calculate_similarity_scores(evaluator_content, student_content3)
# result_df2.to_csv('output.csv', index=False)

# result_df3 = calculate_similarity_scores(evaluator_content, student_content4)
# result_df3.to_csv('output.csv', index=False)

In [None]:
# result_df

In [None]:
# result_df1

In [None]:
# result_df2

In [None]:
# result_df3

In [None]:
# print("TensorFlow version:", tf.__version__)
# print("Transformers version:", transformers.__version__)

In [None]:
# sentence1="Answer : The water cycle , or hydrological cycle , consists of several stages . Evaporation occurs when water from the Earth s surface turns into vapor due to heat . Condensation involves the formation of clouds as water vapor cools and turns into liquid . Precipitation occurs when water droplets or ice crystals fall from clouds as rain , snow , sleet , or hail . Runoff involves the flow of water on the Earth s surface back into oceans , rivers , or lakes ."
# sentence2="Answer : It is a cycle that made up of water "
# result = check_similarity(sentence1, sentence2)
# print(result)

In [None]:
from google.colab.output import eval_js
print(eval_js("google.colab.kernel.proxyPort(5000)"))

In [None]:
app = Flask(__name__,template_folder = '/content/drive/MyDrive/Bert/templates',static_folder='/content/drive/MyDrive/Bert/static')

from IPython.display import clear_output

@app.route('/',methods=['GET','POST'])
def start():
    if request.method =='POST':
        if 'file1' in request.files and 'file2' in request.files:
            evaluator = request.files['file1']
            student = request.files['file2']

            evaluator_path = "/content/drive/MyDrive/Bert/history/evaluator.txt"
            student_path = "/content/drive/MyDrive/Bert/history/student.txt"

            evaluator.save(evaluator_path)
            student.save(student_path)

            with open(evaluator_path, 'rb') as f1:
                    content1 = f1.read()
                    # evaluator_content = f1.read()
            evaluator = {'evaluator.txt': content1}

            with open(student_path, 'rb') as f2:
                    content2 = f2.read()
                    # student_content = f2.read()
            student = {'student.txt': content2}

            print(evaluator)
            print(student)

            for ky in evaluator.keys():
                  evaluator_content = evaluator[ky].decode('utf-8')
            for ky in student.keys():
                  student_content = student[ky].decode('utf-8')
            clear_output()
            result_df = calculate_similarity_scores(evaluator_content, student_content)
            result_df.to_html('/content/drive/MyDrive/Bert/templates/output.html', index=False)
            print(result_df)

            # result_df.to_csv(file_path, index=False).save("/content/drive/MyDrive/Bert/history/output.csv")
            # result_df.to_csv(file_path, index=False)
            return render_template('output.html')

        else:
            return "<h3>Sorry we are unable to detect any files. Please Try Again</h3>"

    return render_template('index.html')


@app.route('/show')
def shows():
    return render_template('index.html')
    # return 'this is a product page'


@app.route('/about')
def about():
    return render_template('about.html')

if __name__ == "__main__":
    # !ngrok http 5000
    app.run()