In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, mean_squared_error, confusion_matrix, classification_report
from transformers import RobertaTokenizer, TFRobertaModel
import nltk
from nltk.corpus import stopwords
import re
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Dense, Dropout, Layer
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical
from gensim.corpora import Dictionary
from gensim.models import LdaModel

In [None]:
train_file_path = '/content/drive/My Drive/Drug review/drugsComTrain_raw.csv'
test_file_path = '/content/drive/My Drive/Drug review/drugsComTest_raw.csv'
train_data = pd.read_csv(train_file_path)
test_data = pd.read_csv(test_file_path)
data = pd.concat([train_data, test_data])

In [None]:
# Text preprocessing function
def preprocess_text(text):
    text = re.sub(r'[^a-zA-Z\s]', '', text, re.I | re.A)
    text = text.lower()
    text = text.strip()
    stop_words = set(stopwords.words('english'))
    text = ' '.join([word for word in text.split() if word not in stop_words])
    return text

In [None]:
# Download stopwords
nltk.download('stopwords')

# Preprocess the reviews
data['clean_review'] = data['review'].apply(preprocess_text)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
# RoBERTa Tokenizer and Model
roberta_tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
roberta_model = TFRobertaModel.from_pretrained('roberta-base')

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFRobertaModel: ['lm_head.dense.bias', 'lm_head.layer_norm.bias', 'lm_head.layer_norm.weight', 'lm_head.bias', 'lm_head.dense.weight', 'roberta.embeddings.position_ids']
- This IS expected if you are initializing TFRobertaModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFRobertaModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
Some weights or buffers of the TF 2.0 model TFRobertaModel were not initialized from the PyTorch model and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and infe

In [None]:
def roberta_features_batch(texts, batch_size=32):
    all_features = []
    for i in range(0, len(texts), batch_size):
        batch_texts = texts[i:i + batch_size]
        inputs = roberta_tokenizer(batch_texts, return_tensors='tf', padding=True, truncation=True, max_length=512)
        outputs = roberta_model(inputs['input_ids'])[0]  # Use last hidden state
        batch_features = tf.reduce_mean(outputs, axis=1)  # Mean pooling
        all_features.append(batch_features)
    return tf.concat(all_features, axis=0)

X_roberta_features = roberta_features_batch(data['clean_review'].tolist())

In [None]:
# Define the Self-Attention layer
class SelfAttention(Layer):
    def __init__(self, **kwargs):
        super(SelfAttention, self).__init__(**kwargs)

    def build(self, input_shape):
        self.W = self.add_weight(name='attention_weight', shape=(input_shape[-1], input_shape[-1]), initializer='glorot_uniform', trainable=True)
        self.b = self.add_weight(name='attention_bias', shape=(input_shape[-1],), initializer='zeros', trainable=True)
        super(SelfAttention, self).build(input_shape)

    def call(self, x):
        u = tf.tanh(tf.tensordot(x, self.W, axes=1) + self.b)
        a = tf.nn.softmax(u, axis=1)
        output = tf.reduce_sum(a * x, axis=1)
        return output

In [None]:
# Function to build and compile GRU model with self-attention
def build_gru_model(input_shape, output_units, loss, activation='softmax'):
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=input_shape),
        tf.keras.layers.GRU(units=128, dropout=0.2, recurrent_dropout=0.2, return_sequences=True),
        SelfAttention(),
        tf.keras.layers.Dense(output_units, activation=activation)
    ])
    model.compile(optimizer=Adam(), loss=loss, metrics=['accuracy'])
    return model

In [None]:
# Encode the target variable
le_condition = LabelEncoder()
y_condition = le_condition.fit_transform(data['condition'])

In [None]:
from sklearn.model_selection import train_test_split
import numpy as np
import tensorflow as tf

# Convert input_ids and attention_mask to NumPy arrays if they are TensorFlow tensors
input_ids_np = input_ids.numpy() if isinstance(input_ids, tf.Tensor) else input_ids
attention_mask_np = attention_mask.numpy() if isinstance(attention_mask, tf.Tensor) else attention_mask

# Convert target variable to NumPy array
y_condition_np = np.array(y_condition)

# Ensure all data is in the same format
input_ids_np = np.array(input_ids_np)
attention_mask_np = np.array(attention_mask_np)

# Split the data
X_train, X_test, attention_mask_train, attention_mask_test, y_train, y_test = train_test_split(
    input_ids_np, attention_mask_np, y_condition_np, test_size=0.2, random_state=42
)

# Convert y_train and y_test to categorical if needed
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

In [None]:
# Build and train the model for condition prediction
model_condition = build_gru_model(output_dim=100, output_units=y_train.shape[1], loss='categorical_crossentropy')
history_condition = model_condition.fit(X_train, y_train, epochs=5, batch_size=64, validation_split=0.1)

In [None]:
# Evaluate the model for condition prediction
y_pred_condition = model_condition.predict(X_test)
y_pred_condition_classes = np.argmax(y_pred_condition, axis=1)
y_test_classes = np.argmax(y_test, axis=1)
print('Classification Report for Condition Prediction:\n', classification_report(y_test_classes, y_pred_condition_classes))
print('Confusion Matrix for Condition Prediction:\n', confusion_matrix(y_test_classes, y_pred_condition_classes))

In [None]:
# Task 02: Estimating drug ratings from reviews (Regression)
y_rating = data['rating']
X_train, X_test, y_train, y_test = train_test_split(X, y_rating, test_size=0.2, random_state=42)
model_rating = build_gru_model(output_dim=100, output_units=1, loss='mean_squared_error', activation='linear')
history_rating = model_rating.fit(X_train, y_train, epochs=5, batch_size=64, validation_split=0.1)
y_pred_rating = model_rating.predict(X_test)
print('Mean Squared Error for Rating Prediction:', mean_squared_error(y_test, y_pred_rating))

In [None]:
# Task 03: Identifying elements that make reviews helpful (Linear Regression)
data['usefulCount'] = data['usefulCount'].fillna(0).astype(int)
y_helpful = data['usefulCount']
X_train, X_test, y_train, y_test = train_test_split(X, y_helpful, test_size=0.2, random_state=42)
model_helpful = build_gru_model(output_dim=100, output_units=1, loss='mean_squared_error', activation='linear')
history_helpful = model_helpful.fit(X_train, y_train, epochs=5, batch_size=64, validation_split=0.1)
y_pred_helpful = model_helpful.predict(X_test)
print('Mean Squared Error for Helpful Prediction:', mean_squared_error(y_test, y_pred_helpful))

In [None]:
# Function to map ratings to sentiments
def get_sentiment(score):
    if score >= 7:
        return 2  # Positive
    elif score <= 4:
        return 0  # Negative
    else:
        return 1  # Neutral

# Add sentiment column to the data
data['sentiment'] = data['rating'].apply(get_sentiment)
y_sentiment = data['sentiment']
X_train, X_test, y_train, y_test = train_test_split(X, y_sentiment, test_size=0.2, random_state=42)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

# Build and train the model for sentiment analysis
model_sentiment = build_gru_model(output_dim=100, output_units=y_train.shape[1], loss='categorical_crossentropy')
history_sentiment = model_sentiment.fit(X_train, y_train, epochs=5, batch_size=64, validation_split=0.1)

# Evaluate the model for sentiment analysis
y_pred_sentiment = model_sentiment.predict(X_test)
y_pred_sentiment_classes = np.argmax(y_pred_sentiment, axis=1)
y_test_classes = np.argmax(y_test, axis=1)
print('Classification Report for Sentiment Analysis:\n', classification_report(y_test_classes, y_pred_sentiment_classes))

In [None]:
# Task 05: Classifying reviews as positive, negative, or neutral (Classification)
y_sentiment = data['sentiment']
X_train, X_test, y_train, y_test = train_test_split(X, y_sentiment, test_size=0.2, random_state=42)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
model_sentiment = build_gru_model(output_dim=100, output_units=y_train.shape[1], loss='categorical_crossentropy')
history_sentiment = model_sentiment.fit(X_train, y_train, epochs=5, batch_size=64, validation_split=0.1)
y_pred_sentiment = model_sentiment.predict(X_test)
y_pred_sentiment_classes = np.argmax(y_pred_sentiment, axis=1)
y_test_classes = np.argmax(y_test, axis=1)
print('Classification Report for Sentiment Classification:\n', classification_report(y_test_classes, y_pred_sentiment_classes))
print('Confusion Matrix for Sentiment Classification:\n', confusion_matrix(y_test_classes, y_pred_sentiment_classes))
print('Accuracy for Sentiment Classification:', accuracy_score(y_test_classes, y_pred_sentiment_classes))

In [None]:
# Plot training history
def plot_training_history(history, title):
    plt.plot(history.history['accuracy'], label='accuracy')
    plt.plot(history.history['val_accuracy'], label='val_accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.ylim([0, 1])
    plt.legend(loc='lower right')
    plt.title(title)
    plt.show()

plot_training_history(history_sentiment, 'Sentiment Classification Accuracy')

In [None]:
# Task 06: Exploring drugs and its associated conditions (Filtering)
def explore_drug(drug_name):
    filtered_data = data[data['drugName'].str.contains(drug_name, case=False, na=False)]
    if filtered_data.empty:
        print(f"No data found for drug: {drug_name}")
        return pd.DataFrame()
    return filtered_data[['drugName', 'condition', 'review', 'rating']]

In [None]:
# Example usage
explored_drug = explore_drug('aspirin')
if not explored_drug.empty:
    print(explored_drug.head())

In [None]:
# Visualizing the distribution of conditions and ratings for a specific drug
def plot_drug_distribution(drug_name):
    filtered_data = explore_drug(drug_name)
    if filtered_data.empty:
        return

    plt.figure(figsize=(12, 6))
    plt.subplot(1, 2, 1)
    filtered_data['condition'].value_counts().plot(kind='bar')
    plt.title(f'Condition Distribution for {drug_name}')
    plt.xlabel('Condition')
    plt.ylabel('Count')

    plt.subplot(1, 2, 2)
    filtered_data['rating'].value_counts().sort_index().plot(kind='bar')
    plt.title(f'Rating Distribution for {drug_name}')
    plt.xlabel('Rating')
    plt.ylabel('Count')

    plt.tight_layout()
    plt.show()

# Example usage
plot_drug_distribution('aspirin')

In [None]:
from gensim.corpora import Dictionary
from gensim.models import LdaModel

# Preprocess the reviews for topic modeling
data['clean_review'] = data['review'].apply(preprocess_text)
reviews_clean = [review.split() for review in data['clean_review']]

# Create a Gensim dictionary and corpus
dictionary = Dictionary(reviews_clean)
corpus = [dictionary.doc2bow(review) for review in reviews_clean]

# Train an LDA model
lda_model = LdaModel(corpus, num_topics=5, id2word=dictionary, passes=10)

In [None]:
def identify_reason(review_text):
    # Implement your logic to identify the reason for a negative review
    # This is a placeholder implementation
    negative_keywords = ['side effect', 'not effective', 'bad experience', 'expensive', 'poor quality']
    for keyword in negative_keywords:
        if keyword in review_text.lower():
            return keyword
    return "General dissatisfaction"

In [None]:
# Define the necessary functions for main
def predict_rating(review_text, rating_model, tokenizer, maxlen):
    review_seq = tokenizer.texts_to_sequences([review_text])
    review_pad = pad_sequences(review_seq, maxlen=maxlen, padding='post')
    predicted_rating = rating_model.predict(review_pad)
    return predicted_rating[0][0]

def predict_sentiment(review_text, sentiment_model, tokenizer, maxlen):
    review_seq = tokenizer.texts_to_sequences([review_text])
    review_pad = pad_sequences(review_seq, maxlen=maxlen, padding='post')
    predicted_sentiment = sentiment_model.predict(review_pad)
    sentiment_class = np.argmax(predicted_sentiment, axis=1)[0]
    return ['Negative', 'Neutral', 'Positive'][sentiment_class]

def predict_drug_info(drug_name, train_df, test_df, rating_model, sentiment_model, tokenizer, maxlen, label_encoder, dictionary, lda_model):
    # Filtering and merging data
    drug_data = pd.concat([train_df, test_df])
    filtered_data = drug_data[drug_data['drugName'].str.contains(drug_name, case=False, na=False)]

    # Predicting conditions
    conditions = filtered_data['condition'].unique().tolist()

    # Predicting ratings and sentiments for each review
    filtered_data['predicted_rating'] = filtered_data['review'].apply(lambda x: predict_rating(x, rating_model, tokenizer, maxlen))
    filtered_data['predicted_sentiment'] = filtered_data['review'].apply(lambda x: predict_sentiment(x, sentiment_model, tokenizer, maxlen))

    # Extracting some reviews
    reviews = filtered_data[['review', 'rating', 'predicted_rating', 'predicted_sentiment']].head(5)

    # Topic modeling (LDA) to identify topics in reviews
    reviews_clean = [preprocess_text(review) for review in filtered_data['review']]
    reviews_bow = [dictionary.doc2bow(review.split()) for review in reviews_clean]
    topics = lda_model.show_topics(num_words=4)

    return conditions, reviews, topics

def main():
    while True:
        print("\nWelcome to Drug Review Analysis System!")
        print("1. Analyze Rating of a Review")
        print("2. Get Drug Information")
        print("3. Reason behind negative review")
        print("4. Exit")
        choice = input("Enter your choice (1/2/3): ")

        if choice == '1':
            review_text = input("Enter your review text: ")
            rating = predict_rating(review_text, model_rating, tokenizer, maxlen)
            sentiment = predict_sentiment(review_text, model_sentiment, tokenizer, maxlen)
            print(f"Predicted Rating: {rating:.2f}")
            print(f"Predicted Sentiment: {sentiment}")

        elif choice == '2':
            drug_name = input("Enter the name of the drug: ")
            conditions, reviews, topics = predict_drug_info(drug_name, train_data, test_data, model_rating, model_sentiment, tokenizer, maxlen, le_condition, dictionary, lda_model)

            print(f"\nConditions for using {drug_name}:")
            for condition in conditions:
                print(f"- {condition}")

            print(f"\nSome Reviews for {drug_name}:")
            for index, row in reviews.iterrows():
                print(f"Review {index + 1}:")
                print(f"Rating: {row['rating']}")
                print(f"Predicted Rating: {row['predicted_rating']:.2f}")
                print(f"Predicted Sentiment: {row['predicted_sentiment']}")
                print(f"Review: {row['review']}")
                print()

            print("Topics in Reviews:")
            for topic in topics:
                print(topic)

        elif choice == '3':
            review_text = input("Enter your review text: ")
            sentiment = predict_sentiment(review_text, model_sentiment, tokenizer, 200)
            if sentiment == 'Negative':
                reason = identify_reason(review_text)
                print(f"Reason for Negative Review: {reason}")
            else:
                print("The review is not negative.")

        elif choice == '4':
            print("Exiting the program...")
            break

        else:
            print("Invalid choice. Please enter 1, 2, 3, or 4.")

if __name__ == "__main__":
    main()