In [None]:
import numpy as np
import re
from tensorflow.keras.layers import MultiHeadAttention, Embedding, LSTM, Dense, Bidirectional, Dropout, Input, TimeDistributed
from tensorflow.keras.models import Model
from tensorflow.keras.initializers import Constant
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

def load_and_preprocess_data(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()
        pattern = re.compile(r'<Patient>(.*?)<Doctor>(.*?)\n', re.DOTALL)
        matches = pattern.findall(content)
        patient_msgs, doctor_replies = zip(*matches)
        
        tokenizer = Tokenizer(num_words=10000, oov_token="<OOV>")
        tokenizer.fit_on_texts(patient_msgs + doctor_replies)
        
        patient_seqs = tokenizer.texts_to_sequences(patient_msgs)
        doctor_seqs = tokenizer.texts_to_sequences(doctor_replies)
        
        max_sequence_length = 215
        patient_seqs_padded = pad_sequences(patient_seqs, maxlen=max_sequence_length, padding='post', truncating='post')
        doctor_seqs_padded = pad_sequences(doctor_seqs, maxlen=max_sequence_length, padding='post', truncating='post')
        
        return tokenizer, np.array(patient_seqs_padded), np.array(doctor_seqs_padded)

def load_glove_embeddings(glove_file, embedding_dim):
    embeddings_index = {}
    with open(glove_file, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.split()
            word = values[0]
            coefs = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = coefs
    return embeddings_index

def create_embedding_matrix(word_index, embeddings_index, vocab_size, embedding_dim):
    embedding_matrix = np.zeros((vocab_size, embedding_dim))
    for word, i in word_index.items():
        if i < vocab_size:
            embedding_vector = embeddings_index.get(word)
            if embedding_vector is not None:
                embedding_matrix[i] = embedding_vector
    return embedding_matrix

def build_attention_model(vocab_size, max_length, embedding_dim, embedding_matrix):
    inputs = Input(shape=(max_length,))
    x = Embedding(input_dim=vocab_size, output_dim=embedding_dim,
                  embeddings_initializer=Constant(embedding_matrix), trainable=True)(inputs)
    x = Bidirectional(LSTM(128, return_sequences=True))(x)
    x = Dropout(0.3)(x)
    x = Bidirectional(LSTM(128, return_sequences=True))(x)
    attention_layer = MultiHeadAttention(num_heads=4, key_dim=64)
    attention_out = attention_layer(query=x, value=x)
    context_vector = TimeDistributed(Dense(128))(attention_out)
    output = Dense(vocab_size, activation='softmax')(context_vector)
    model = Model(inputs=inputs, outputs=output)
    
    optimizer = Adam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

def generate_response(model, tokenizer, text, max_length):
    sequence = tokenizer.texts_to_sequences([text])
    padded = pad_sequences(sequence, maxlen=max_length, padding='post', truncating='post')
    prediction = model.predict(padded)
    predicted_sequence = np.argmax(prediction, axis=-1)[0]
    response = ' '.join(tokenizer.index_word.get(i, '') for i in predicted_sequence if i > 0)
    return response


def main(train_file_path, test_file_path):
    tokenizer, train_patient_input, train_doctor_output = load_and_preprocess_data(train_file_path)
    _, test_patient_input, test_doctor_output = load_and_preprocess_data(test_file_path)

    embedding_dim = 100
    glove_file = 'Datasets/glove.6B.100d.txt'
    embeddings_index = load_glove_embeddings(glove_file, embedding_dim)
    embedding_matrix = create_embedding_matrix(tokenizer.word_index, embeddings_index, 10000, embedding_dim)

    model = build_attention_model(10000, 215, embedding_dim, embedding_matrix)
    model.fit(train_patient_input, train_doctor_output, epochs=150, batch_size=128, validation_data=(test_patient_input, test_doctor_output))
    
    loss, accuracy = model.evaluate(test_patient_input, test_doctor_output)
    print(f"Test Accuracy: {accuracy*100:.2f}%")

    new_patient_input = "throat a bit sore and want to get a good immune booster, especially in light of the virus. please advise?"
    response = generate_response(model, tokenizer, new_patient_input, 215)
    print("Response:", response)

if __name__ == "__main__":
    main('Datasets/converted_meddialog_en.json', 'Datasets/converted_meddialog_test_en.json')


In [None]:
import nltk

nltk.download('wordnet')

In [None]:
pip install rouge

In [None]:
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from rouge import Rouge

def calculate_bleu(reference_texts, candidate_texts):
    reference_tokens = [[ref.split()] for ref in reference_texts]
    candidate_tokens = [cand.split() for cand in candidate_texts]
    chencherry = SmoothingFunction()
    score = corpus_bleu(reference_tokens, candidate_tokens, smoothing_function=chencherry.method1)
    return score

def calculate_rouge(reference_texts, candidate_texts):
    rouge = Rouge()
    scores = rouge.get_scores(candidate_texts, reference_texts, avg=True)
    return scores

def calculate_meteor(reference_texts, candidate_texts):
    scores = [meteor_score([ref.split()], cand.split()) for ref, cand in zip(reference_texts, candidate_texts)]
    return sum(scores) / len(scores)

reference_texts = ["during this pandemic. throat pain can be from a strep throat infection (antibiotics needed), a cold or influenza or other virus, or from some other cause such as allergies or irritants. usually, a person sees the doctor (call first) if the sore throat is bothersome, recurrent, or doesn't go away quickly. covid-19 infections tend to have cough, whereas strep throat usually lacks cough but has more throat pain. "]
candidate_texts = ["brief brief brief brief brief covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid www www www www https https https https q q q q q q or or or or video video text"]

bleu_score = calculate_bleu(reference_texts, candidate_texts)
rouge_scores = calculate_rouge(reference_texts, candidate_texts)
meteor_score = calculate_meteor(reference_texts, candidate_texts)

print("BLEU Score:", bleu_score)
print("ROUGE Scores:", rouge_scores)
print("METEOR Score:", meteor_score)


In [None]:
pip install bert-score

In [None]:
from bert_score import score

def compute_bert_score(predictions, references):
    P, R, F1 = score(predictions, references, lang="en", verbose=True)
    return {"Precision": P.mean().item(), "Recall": R.mean().item(), "F1-Score": F1.mean().item()}

# Example usage:
predictions = ["during this pandemic. throat pain can be from a strep throat infection (antibiotics needed), a cold or influenza or other virus, or from some other cause such as allergies or irritants. usually, a person sees the doctor (call first) if the sore throat is bothersome, recurrent, or doesn't go away quickly. covid-19 infections tend to have cough, whereas strep throat usually lacks cough but has more throat pain. "]
references = ["brief brief brief brief brief covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid covid www www www www https https https https q q q q q q or or or or video video text"]

bert_scores = compute_bert_score(predictions, references)
print(bert_scores)
