In [None]:
!! pip install transformers datasets evaluate

['Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/',

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

import os
#Change the current working directory to the path of Google Cloud Drive
path="/content/drive/MyDrive/ML"
os.chdir(path)
print(os.listdir(path))
os.chdir('/content')

Drive already mounted at /content/drive/; to attempt to forcibly remount, call drive.mount("/content/drive/", force_remount=True).
['backbone_labeled.json', 'backbone_labeled1.json', 'qa_labeled1.json', 'backbone_labeled2.json', 'qa_labeled.json', 'ee66fe54-da44-411e-a67e-19bacd205646.json']


In [None]:
id2label = {0: "NOT AN ANSWER", 1: "ANSWER"}
label2id = {"NOT AN ANSWER": 0, "ANSWER": 1}

In [None]:
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("ai-forever/sbert_large_mt_nlu_ru")

In [None]:
def preprocess_qa_data_function(data):
    return tokenizer(data['question'], data['answer'], truncation=True, max_length=128, padding="max_length")

In [None]:
import random

def extract_answer_text(answer):
  return answer['text']


def extract_answer_label(answer):
  return answer['is_good_answer']


def extract_answers_texts(answers):
  return list(map(extract_answer_text, answers))


def extract_answers_labels(answers):
  return list(map(extract_answer_label, answers))


def is_not_empty_str(line):
  return line.strip() != ''


def is_not_empty_answer_row(row):
  return row['text'].strip() != ''


def filter_empty_answers(answers):
  return list(filter(is_not_empty_str, answers))


def filter_empty_answer_rows(answers):
  return list(filter(is_not_empty_answer_row, answers))


def check_answers_correct(answers):
  return len(filter_empty_answers(answers)) != 0


def get_random_answer_text(data_json):
  random_q = random.randint(0, len(data_json)-1)
  row = data_json[random_q]
  if not check_answers_correct(extract_answers_texts(row['answers'])):
    return get_random_answer_text(data_json)
  answers = filter_empty_answers(extract_answers_texts(row['answers']))
  return answers[random.randint(0, len(answers)-1)]


def is_answers_contains_good_answer(answers):
  return 1 in extract_answers_labels(answers)


def create_pair(question, answer, label):
  return {
      "question": question,
      "answer": answer,
      "label": label
  }


In [None]:
import json
import pandas as pd
from sklearn.model_selection import train_test_split

#json_to_check = json.load(open("/content/qa_fine_tune.json"))

labeled_pairs = []

#for row in json_to_check:
#  pairs = row['pairs']
#  if len(pairs) == 0:
#    continue
#  pos_pair = None
#  neg_pair = None
#  for pair in pairs:
#    pair['question'] = pair['question'].lower()
#    pair['answer'] = pair['answer'].lower()
#    if neg_pair == None and pair['label'] == 0:
#      neg_pair = pair
#    if pos_pair == None and pair['label'] == 1:
#      pos_pair = pair
#  if pos_pair != None and neg_pair != None:
#    labeled_pairs.append(pos_pair)
#  if pos_pair != None and neg_pair != None:
#    labeled_pairs.append(neg_pair)

jsn = json.load(open("/content/drive/MyDrive/ML/ee66fe54-da44-411e-a67e-19bacd205646.json"))

for row in jsn:
  question = row['question']['text']
  answers = row['answers']
  if not is_answers_contains_good_answer(answers):
    continue
  neg_answer = None
  pos_answer = None
  for answer_row in filter_empty_answer_rows(row['answers']):
    if neg_answer == None and answer_row['is_good_answer'] == 0:
      neg_answer = answer_row['text']
    if pos_answer == None and answer_row['is_good_answer'] == 1:
      pos_answer = answer_row['text']
  if pos_answer != None:
    #labeled_pairs.append(create_pair(question, pos_answer, 1))
    pos_pair = create_pair(question, pos_answer, 1)
    neg_pair = None
    if neg_answer == None:
      #labeled_pairs.append(create_pair(question, get_random_answer_text(jsn), 0))
      neg_pair = create_pair(question, get_random_answer_text(jsn), 0)
    else:
      #labeled_pairs.append(create_pair(question, neg_answer, 0))
      neg_pair = create_pair(question, neg_answer, 0)
    labeled_pairs.append({'pos': pos_pair, 'neg': neg_pair})

train_pairs, test_pairs = train_test_split(labeled_pairs, train_size=0.7, random_state=21)

train_pairs_flat = []
test_pairs_flat = []
for row in train_pairs:
  train_pairs_flat.append(row['pos'])
  train_pairs_flat.append(row['neg'])
for row in test_pairs:
  test_pairs_flat.append(row['pos'])
  test_pairs_flat.append(row['neg'])

#js_df = pd.read_json(json.dumps(labeled_pairs))
#js_df.to_csv('to_check.csv')

In [None]:
#data_to_check = js_df
#print(data_to_check.head(10))
#print(len(data_to_check))

In [None]:

#_data_to_check = data_to_check.copy().dropna(subset=['question', 'answer', 'label'])

#_train, _test = train_test_split(_data_to_check.copy(), train_size=0.7, random_state=21)
#_data_labels_train = _train['label']
#_data_features1_train = _train['question']
#_data_features2_train = _train['answer']

#_data_labels_test = _test['label']
#_data_features1_test = _test['question']
#_data_features2_test = _test['answer']
_train = pd.read_json(json.dumps(train_pairs_flat))
_test = pd.read_json(json.dumps(test_pairs_flat))
print(_train.head())
print(_test.head())

                                            question  \
0  У вас на сайте выставлена такая информация о Г...   
1  У вас на сайте выставлена такая информация о Г...   
2  Здравствуйте, подскажите, пожалуйста. Если я п...   
3  Здравствуйте, подскажите, пожалуйста. Если я п...   
4  Здравствуйте. \nСын подает документы в пять ВУ...   

                                              answer  label  
0  грант в целом дается только на год, потом его ...      1  
1  по рейтингу за 1 курс выдаются гранты на 2 кур...      0  
2  Если совсем другой вуз, не КФУ, то  забирать н...      1  
3  Здравствуйте, цены на платку могут поменяться ...      0  
4  согласие на зачисление можно подать только в 1...      1  
                                            question  \
0  Для заселения из справок нужны только справка ...   
1  Для заселения из справок нужны только справка ...   
2  Добрый день, подскажите, пожалуйста, почему ещ...   
3  Добрый день, подскажите, пожалуйста, почему ещ...   
4  Здравств

In [None]:
from datasets import Dataset

train_hg_dataset = Dataset.from_pandas(_train)
test_hg_dataset = Dataset.from_pandas(_test)

train_hg_dataset_tokenized = train_hg_dataset.map(preprocess_qa_data_function, batched=True)
test_hg_dataset_tokenized = test_hg_dataset.map(preprocess_qa_data_function, batched=True)

print(train_hg_dataset_tokenized[:5])
print(tokenizer.decode(train_hg_dataset_tokenized[0]["input_ids"]))
print(test_hg_dataset_tokenized[:-1]['label'])

Map:   0%|          | 0/210 [00:00<?, ? examples/s]

Map:   0%|          | 0/92 [00:00<?, ? examples/s]

{'question': ['У вас на сайте выставлена такая информация о Гранте:\n"Грант предоставляется в соответствии с Подпрограммой «Государственная поддержка развития экономической среды и человеческого капитала в сфере информационных технологий в Республике Татарстан на 2014 – 2023 годы"\nТ.е. если в 22 году поступит на грант, то возможность учиться бесплатно только на 1 год? Или я что-то неверно поняла?', 'У вас на сайте выставлена такая информация о Гранте:\n"Грант предоставляется в соответствии с Подпрограммой «Государственная поддержка развития экономической среды и человеческого капитала в сфере информационных технологий в Республике Татарстан на 2014 – 2023 годы"\nТ.е. если в 22 году поступит на грант, то возможность учиться бесплатно только на 1 год? Или я что-то неверно поняла?', 'Здравствуйте, подскажите, пожалуйста. Если я подал к вам оригинал аттестата, могу ли я, его не забрав, подать согласие на зачисление в другой вуз ? Или обязательно нужно забирать оригинал', 'Здравствуйте, по

In [None]:
from transformers import DataCollatorWithPadding

data_collator = DataCollatorWithPadding(tokenizer=tokenizer, return_tensors="tf")

In [None]:
import evaluate

accuracy = evaluate.load("accuracy")
precision = evaluate.load("precision")
recall = evaluate.load("recall")
f1 = evaluate.load("f1")

In [None]:
import numpy as np


def compute_accuracy(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return accuracy.compute(predictions=predictions, references=labels)


def compute_precision(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return precision.compute(predictions=predictions, references=labels)


def compute_recall(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return recall.compute(predictions=predictions, references=labels)


def compute_f1(eval_pred):
    predictions, labels = eval_pred
    predictions = np.argmax(predictions, axis=1)
    return f1.compute(predictions=predictions, references=labels)

In [None]:
from transformers import create_optimizer
import tensorflow as tf

batch_size = 8
num_epochs = 3
batches_per_epoch = len(_train) // batch_size
total_train_steps = int(batches_per_epoch * num_epochs)
optimizer, schedule = create_optimizer(init_lr=2e-5, num_warmup_steps=0, num_train_steps=total_train_steps)

In [None]:
from transformers import TFAutoModelForSequenceClassification

model = TFAutoModelForSequenceClassification.from_pretrained(
    "ai-forever/sbert_large_mt_nlu_ru", num_labels=2, id2label=id2label, label2id=label2id
)

All model checkpoint layers were used when initializing TFBertForSequenceClassification.

Some layers of TFBertForSequenceClassification were not initialized from the model checkpoint at ai-forever/sbert_large_mt_nlu_ru and are newly initialized: ['classifier']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
model.summary()
for i in range(len(model.bert.encoder.layer)):
  model.bert.encoder.layer[i].trainable = False
  if i == len(model.bert.encoder.layer) - 1:
    model.bert.encoder.layer[i].trainable = True
  if i == len(model.bert.encoder.layer) - 2:
    model.bert.encoder.layer[i].trainable = True
  if i == len(model.bert.encoder.layer) - 3:
    model.bert.encoder.layer[i].trainable = True
model.summary()

Model: "tf_bert_for_sequence_classification"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 bert (TFBertMainLayer)      multiple                  426908672 
                                                                 
 dropout_73 (Dropout)        multiple                  0         
                                                                 
 classifier (Dense)          multiple                  2050      
                                                                 
Total params: 426,910,722
Trainable params: 426,910,722
Non-trainable params: 0
_________________________________________________________________
Model: "tf_bert_for_sequence_classification"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 bert (TFBertMainLayer)      multiple                  426908672 
                                      

In [None]:
train_dataset = model.prepare_tf_dataset(
    train_hg_dataset_tokenized,
    shuffle=True,
    batch_size=batch_size,
    collate_fn=data_collator,
)

test_dataset = model.prepare_tf_dataset(
    test_hg_dataset_tokenized,
    shuffle=False,
    batch_size=batch_size,
    collate_fn=data_collator,
)

You're using a BertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


In [None]:
import tensorflow as tf

model.compile(optimizer=optimizer, metrics=["accuracy"])

In [None]:
from transformers.keras_callbacks import KerasMetricCallback

val_accuracy_callback = KerasMetricCallback(metric_fn=compute_accuracy, eval_dataset=test_dataset)
val_precision_callback = KerasMetricCallback(metric_fn=compute_precision, eval_dataset=test_dataset)
val_recall_callback = KerasMetricCallback(metric_fn=compute_recall, eval_dataset=test_dataset)
val_f1_callback = KerasMetricCallback(metric_fn=compute_f1, eval_dataset=test_dataset)
callbacks = [val_precision_callback, val_recall_callback, val_f1_callback]

In [None]:
checkpoint_path = 'training_1/cp.ckpt'
cp_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_path,
                                                 save_weights_only=True,
                                                 save_best_only=True,
                                                 verbose=1,
                                                 monitor='val_loss')
es_callback = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    min_delta=0.001,
    patience=3,
    verbose=0,
    mode='auto',
    baseline=None,
    restore_best_weights=True
)
callbacks.append(cp_callback)
callbacks.append(es_callback)

In [None]:
history = model.fit(x=train_dataset, validation_data=test_dataset, epochs=20, callbacks=callbacks)

Epoch 1/20
Epoch 1: val_loss improved from inf to 0.66980, saving model to training_1/cp.ckpt
Epoch 2/20
Epoch 2: val_loss improved from 0.66980 to 0.64251, saving model to training_1/cp.ckpt
Epoch 3/20
Epoch 3: val_loss improved from 0.64251 to 0.63476, saving model to training_1/cp.ckpt
Epoch 4/20


In [None]:
model.load_weights(checkpoint_path)
model.compile(optimizer=optimizer)

In [None]:
import matplotlib.pyplot as plt


def plot_graphs(history, metric):
  plt.plot(history.history[metric])
  plt.plot(history.history['val_'+metric], '')
  plt.xlabel("Epochs")
  plt.ylabel(metric)
  plt.legend([metric, 'val_'+metric])

In [None]:
pred_logits = model.predict(test_dataset)

In [None]:
print(pred_logits)

In [None]:
labels = test_hg_dataset_tokenized['label']
predictions = np.argmax(pred_logits.logits, axis=1)
print(labels)
print(predictions)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

cm_real = confusion_matrix(labels, predictions, labels=[0, 1])
disp = ConfusionMatrixDisplay(confusion_matrix=cm_real, display_labels=[0, 1])
disp.plot()
plt.show()

In [None]:
plt.figure(figsize=(16, 8))
plt.subplot(1, 2, 1)
plot_graphs(history, 'accuracy')
plt.ylim(None, 1)
plt.subplot(1, 2, 2)
plot_graphs(history, 'loss')
plt.ylim(0, None)

In [None]:
from sklearn.metrics import classification_report

target_names = ['NOT AN ANSWER', 'ANSWER']
print(classification_report(labels, predictions, target_names=target_names))

In [None]:
model.save('BERT_QA.h5')

In [None]:
#import shutil
#output_filename = "bert_qa"
##dir_name = "BERT_QA"
#shutil.make_archive(output_filename, 'zip', dir_name)

In [None]:
from google.colab import files
#files.download('bert_qa.zip')