# DATA603 Big Data Processing Project 
Group 3: Pooja Kangokar Pranesh, Yun-Zih Chen, Elizabeth Cardosa

The goal of this project is leverage big data technologies to train a model using the UCI ML Drug Review dataset to predict the star rating of drug based on the sentiment of the review. This model will then perform inference in a streaming manner on ‘real-time’ reviews coming in. This application can then be used to help potential customers understand the overall sentiment towards a drug and if it might be useful for them. 


In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
working_folder = "/content/drive/My Drive/UMBC Fall 2022/DATA603 Big Data Processing/Project/Data/"

# Install Libraries and Dependencies

In [3]:
"""!pip install -qq pyspark 
!pip install -qq spark-nlp 
!pip install -qq findspark """

'!pip install -qq pyspark \n!pip install -qq spark-nlp \n!pip install -qq findspark '

In [4]:
# Install PySpark and Spark NLP
! pip install -qq pyspark==3.2.1 spark-nlp findspark #pyspark==3.1.2 spark-nlp findspark

In [5]:
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

--2022-11-28 02:29:17--  http://setup.johnsnowlabs.com/colab.sh
Resolving setup.johnsnowlabs.com (setup.johnsnowlabs.com)... 51.158.130.125
Connecting to setup.johnsnowlabs.com (setup.johnsnowlabs.com)|51.158.130.125|:80... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://setup.johnsnowlabs.com/colab.sh [following]
--2022-11-28 02:29:17--  https://setup.johnsnowlabs.com/colab.sh
Connecting to setup.johnsnowlabs.com (setup.johnsnowlabs.com)|51.158.130.125|:443... connected.
HTTP request sent, awaiting response... 302 Moved Temporarily
Location: https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh [following]
--2022-11-28 02:29:17--  https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/scripts/colab_setup.sh
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:44

In [6]:
import pyspark.pandas as ps
import pandas as pd



In [7]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext

In [8]:
from sparknlp.pretrained import PretrainedPipeline
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *

In [9]:
"""# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark"""

'# Import SparkSession\nfrom pyspark.sql import SparkSession\n# Create a Spark Session\nspark = SparkSession.builder.master("local[*]").getOrCreate()\n# Check Spark Session Information\nspark'

In [10]:
spark = sparknlp.start()

print("Spark NLP version: {}".format(sparknlp.version()))
print("Apache Spark version: {}".format(spark.version))

Spark NLP version: 4.2.3
Apache Spark version: 3.2.1


In [11]:
sc = SparkContext.getOrCreate();

# Read-in Dataset


## Dataset: https://archive.ics.uci.edu/ml/datasets/Drug+Review+Dataset+%28Drugs.com%29


The dataset provides patient reviews on specific drugs along with related conditions and a 10 star patient rating reflecting overall patient satisfaction. The data was obtained by crawling online pharmaceutical review sites. The intention was to study

- sentiment analysis of drug experience over multiple facets, i.e. sentiments learned on specific aspects such as effectiveness and side effects,
- the transferability of models among domains, i.e. conditions, and
- the transferability of models among different data sources (see 'Drug Review Dataset (Druglib.com)').

The data is split into a train (75%) a test (25%) partition (see publication) and stored in two .tsv (tab-separated-values) files, respectively.

Attribute Information:

1. drugName (categorical): name of drug
2. condition (categorical): name of condition
3. review (text): patient review
4. rating (numerical): 10 star patient rating
5. date (date): date of review entry
6. usefulCount (numerical): number of users who found review useful


Important notes:

When using this dataset, you agree that you
1. only use the data for research purposes
2. don't use the data for any commerical purposes
3. don't distribute the data to anyone else
4. cite us

Felix Gräßer, Surya Kallumadi, Hagen Malberg, and Sebastian Zaunseder. 2018. Aspect-Based Sentiment Analysis of Drug Reviews Applying Cross-Domain and Cross-Data Learning. In Proceedings of the 2018 International Conference on Digital Health (DH '18). ACM, New York, NY, USA, 121-125. DOI: [Web Link] 

## Load in Test Data

In [12]:
# Read in training data file
customschema = StructType([
  StructField("UniqueID", IntegerType(), True)
  ,StructField("drugName", StringType(), True)
  ,StructField("condition", StringType(), True)
  ,StructField("review", StringType(), True)
  ,StructField("rating", DoubleType(), True)
  ,StructField("date", StringType(), True)
  ,StructField("usefulCount", IntegerType(), True)
  ])

In [13]:
df_test = spark.read.format("csv")\
           .option("delimiter", "\t")\
           .option("header", "true")\
           .option("quote", "\"")\
           .option("escape", "\"")\
           .option("multiLine","true")\
           .option("quoteMode","ALL")\
           .option("mode","PERMISSIVE")\
           .option("ignoreLeadingWhiteSpace","true")\
           .option("ignoreTrailingWhiteSpace","true")\
           .option("parserLib","UNIVOCITY")\
           .schema(customschema)\
           .load(working_folder + "drugsComTest_raw.tsv")

In [None]:
df_test.count()

In [None]:
df_test.show(5)

## Load in and Explore Training Data

In [None]:
# Read in training data file
customschema = StructType([
  StructField("UniqueID", IntegerType(), True)
  ,StructField("drugName", StringType(), True)
  ,StructField("condition", StringType(), True)
  ,StructField("review", StringType(), True)
  ,StructField("rating", DoubleType(), True)
  ,StructField("date", StringType(), True)
  ,StructField("usefulCount", IntegerType(), True)
  ])

df = spark.read.format("csv")\
           .option("delimiter", "\t")\
           .option("header", "true")\
           .option("quote", "\"")\
           .option("escape", "\"")\
           .option("multiLine","true")\
           .option("quoteMode","ALL")\
           .option("mode","PERMISSIVE")\
           .option("ignoreLeadingWhiteSpace","true")\
           .option("ignoreTrailingWhiteSpace","true")\
           .option("parserLib","UNIVOCITY")\
           .schema(customschema)\
           .load(working_folder + "drugsComTrain_raw.tsv")

In [None]:
df.count()

In [None]:
df.show(5)

In [None]:
#pd_df = df.toPandas()

### Clean Training Dataset

In [None]:
# Remove rows with null columns
df = df.dropna()

In [None]:
df.count()

In [None]:
# Drop conditions with </span> tag
df = df.where(~df.condition.contains("</span>"))

In [None]:
df.count()

In [None]:
df.groupby('rating').count().orderBy("rating", ascending=False).show()

In [None]:
# Average Star Rating by Condition
df.groupBy("condition").agg({'rating':'avg', 'condition':'count'}).orderBy("count(condition)",ascending=False).show()

In [None]:
# Average Star Rating by Drug Name 
df.groupBy("drugName").agg({'rating':'avg', 'drugName':'count'}).orderBy("count(drugName)",ascending=False).show()

In [None]:
pd_df_train = df.toPandas()

In [None]:
pd_df_train.to_csv("testing.csv")

In [None]:
import py4j.protocol  
from py4j.protocol import Py4JJavaError  
from py4j.java_gateway import JavaObject  
from py4j.java_collections import JavaArray, JavaList

from pyspark import RDD, SparkContext  
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer


# Helper function to convert python object to Java objects
def _to_java_object_rdd(rdd):  
    """ Return a JavaRDD of Object by unpickling
    It will convert each Python object into Java object by Pyrolite, whenever the
    RDD is serialized in batch or not.
    """
    rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
    return rdd.ctx._jvm.org.apache.spark.mllib.api.python.SerDe.pythonToJava(rdd._jrdd, True)

# First you have to convert it to an RDD 
JavaObj = _to_java_object_rdd(df.rdd)

# Now we can run the estimator
sc._jvm.org.apache.spark.util.SizeEstimator.estimate(JavaObj)

# Use John Snow Labs pretrained sentiment models pipeline


https://nlp.johnsnowlabs.com/

Medium Article: 
https://medium.com/analytics-vidhya/sentiment-analysis-with-sparknlp-couldnt-be-easier-2a8ea3b728a0

John Snow Labs Reference Notebook: 
https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/jupyter/quick_start_google_colab.ipynb#scrollTo=tyMMD_upEfIa

This model using BioBERT would potentially perform better, but it is not free-tier:
https://nlp.johnsnowlabs.com/2022/07/28/bert_sequence_classifier_drug_reviews_webmd_en_3_0.html


### Use Twitter Sentiment Analysis Model: analyze_sentimentdl_use_twitter 

Model: https://nlp.johnsnowlabs.com/2021/01/18/sentimentdl_use_twitter_en.html

Universal Sentence Encoder: https://nlp.johnsnowlabs.com/2020/04/17/tfhub_use.html

In [None]:
"""pipeline = PretrainedPipeline('analyze_sentimentdl_use_twitter', 'en')
pipeline.model.stages
# rename the text column as 'text', pipeline expects 'text' 
df_result = pipeline.transform(df.withColumnRenamed("review", "text"))
# Extract results from the "sentiments" column
df_twitter_sentiments = df_result.withColumn("sentiment", explode('sentiment.result')).drop(*['document','sentence_embeddings'])"""


A vast majority of the reviews are negative


+---------+------+<br>
|sentiment| count|<br>
+---------+------+<br>
| positive| 31299|<br>
|  neutral|  6568|<br>
| negative|123430|<br>
+---------+------+



In [None]:
# took 20 minutes to run
#df_twitter_sentiments.groupBy('sentiment').count().show()

### Use RoBERTa Sentiment Classifier: roberta_classifier_autotrain_sentiment_polarity_918130222

Model: https://nlp.johnsnowlabs.com/2022/09/19/roberta_classifier_autotrain_sentiment_polarity_918130222_en.html

HuggingFace: https://huggingface.co/docs/transformers/model_doc/roberta

Breakdown how pretrained pipeline works under the hood: https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/streamlit_notebooks/SENTIMENT_EN.ipynb

In [None]:
documentAssembler = DocumentAssembler() \
        .setInputCol("review") \
        .setOutputCol("document")

tokenizer = Tokenizer() \
    .setInputCols("document") \
    .setOutputCol("token")

seq_classifier = RoBertaForSequenceClassification.pretrained("roberta_classifier_autotrain_sentiment_polarity_918130222","en") \
    .setInputCols(["document", "token"]) \
    .setOutputCol("class")

In [None]:
nlp_pipeline = Pipeline(stages=[documentAssembler, tokenizer, seq_classifier])

In [None]:
#df_train = pipeline.fit(df.withColumnRenamed("review", "text")).transform(df.withColumnRenamed("review", "text"))
df_train = nlp_pipeline.fit(df).transform(df)

In [None]:
df_train = df_train.withColumn("sentiment", explode('class.result')).drop('document','token','class')

In [None]:
df_train.show()

In [None]:
df_train.sentiment.cast(IntegerType())

In [None]:
df_train.show()

In [None]:
# First you have to convert it to an RDD 
JavaObj = _to_java_object_rdd(df_train.rdd)

# Now we can run the estimator
sc._jvm.org.apache.spark.util.SizeEstimator.estimate(JavaObj)

In [None]:
df_train.write.mode('overwrite').parquet(working_folder + "drug_reviews_with_sentiment_train.parquet")

In [None]:
# Write complete dataframe to disk
#pd_df_train = df_train.toPandas()

In [None]:
#pd_df_train.to_csv(working_folder + "drug_reviews_with_sentiment_train.csv")

In [None]:
# Drop rows with missing values
df_test = df_test.dropna()

In [None]:
## Drop rows where condition contains irrelevant strings
df_test = df_test.where(~df_test.condition.contains("</span>"))

In [None]:
df_test.count()

In [None]:
df_test = pipeline.fit(df_test).transform(df_test)

In [None]:
df_test = df_test.withColumn("sentiment", explode('class.result')).drop(*['token','class','document'])

In [None]:
df_test.show()

In [None]:
# Write complete dataframe to disk
#df_test.write.csv(working_folder + "drug_reviews_with_sentiment_test.csv")
#df_test.toPandas().to_csv(working_folder + "drug_reviews_with_sentiment_test.csv")
df_test.write.mode('overwrite').parquet(working_folder + "drug_reviews_with_sentiment_test.parquet")