## Import Libraries

In [1]:
SparkContext.setSystemProperty('spark.executor.pyspark.memory', '8g')
from pyspark import SparkFiles
import os
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import pandas as pd
from pyspark.mllib.util import MLUtils

In [2]:
#!pip install pytest-shutil

In [3]:
#import pickle

In [4]:
#!pip install textblob

In [5]:
#!pip install vaderSentiment

In [6]:
%%time
## Testing the combined CSV from S3 bucket.
url= "https://icdrive1.s3.amazonaws.com/drug_review_data_combined.csv"
#url="https://icdrive1.s3.amazonaws.com/drug-use-by-age.csv"
spark.sparkContext.addFile(url)
df_combined = spark.read.option('header', 'true').csv(SparkFiles.get("drug_review_data_combined.csv"), inferSchema=True, sep=",", multiLine=True)


Wall time: 40.3 s


**Build Pandas Dataframe for perfroming Sentimental Analysis**

In [8]:
train_df = df_combined.toPandas()
train_df["pos"] = ""
train_df["neg"] = ""

In [9]:
train_df['label'] = train_df['rating'].apply(lambda x: 1 if x>5 else 0)

In [10]:
train_df.head()

Unnamed: 0,_c0,uniqueID,drugName,condition,review,rating,usefulCount,review_date,review_outcome,pos,neg,label
0,0,206461,Valsartan,Left Ventricular Dysfunction,"It has no side effect, I take it in combinatio...",9,27,2012-05-20,Positive,,,1
1,1,95260,Guanfacine,ADHD,My son is halfway through his fourth week of I...,8,192,2010-04-27,Positive,,,1
2,2,92703,Lybrel,Birth Control,"I used to take another oral contraceptive, whi...",5,17,2009-12-14,Negative,,,0
3,3,138000,Ortho Evra,Birth Control,This is my first time using any form of birth ...,8,10,2015-11-03,Positive,,,1
4,4,35696,Buprenorphine / naloxone,Opiate Dependence,Suboxone has completely turned my life around....,9,37,2016-11-27,Positive,,,1


## Perform Sentiment Analysis on Training Data

In [11]:
analyzer = SentimentIntensityAnalyzer()

In [12]:
res = analyzer.polarity_scores(train_df["review"][0])
res

{'neg': 0.136, 'neu': 0.864, 'pos': 0.0, 'compound': -0.296}

In [13]:
%%time
#review_df = review_df.withColumnRenamed("review_outcome", "label")
train_df["pos"] = train_df["review"].apply(lambda x: analyzer.polarity_scores(x)['pos'])

Wall time: 6min 15s


In [14]:
%%time
#result = train_df["review"].apply(lambda x: analyzer.polarity_scores(x))

Wall time: 0 ns


In [15]:
%%time
train_df["neg"] = train_df["review"].apply(lambda x: analyzer.polarity_scores(x)['neg'])

Wall time: 6min 4s


In [16]:
# Finding the length of review
#from pyspark.sql.functions import length
#review_df = review_df.withColumn('review_length',length(review_df['review']))
#review_df.cache()
#review_df.show()
train_df.head()

Unnamed: 0,_c0,uniqueID,drugName,condition,review,rating,usefulCount,review_date,review_outcome,pos,neg,label
0,0,206461,Valsartan,Left Ventricular Dysfunction,"It has no side effect, I take it in combinatio...",9,27,2012-05-20,Positive,0.0,0.136,1
1,1,95260,Guanfacine,ADHD,My son is halfway through his fourth week of I...,8,192,2010-04-27,Positive,0.111,0.018,1
2,2,92703,Lybrel,Birth Control,"I used to take another oral contraceptive, whi...",5,17,2009-12-14,Negative,0.083,0.062,0
3,3,138000,Ortho Evra,Birth Control,This is my first time using any form of birth ...,8,10,2015-11-03,Positive,0.093,0.027,1
4,4,35696,Buprenorphine / naloxone,Opiate Dependence,Suboxone has completely turned my life around....,9,37,2016-11-27,Positive,0.178,0.064,1


In [19]:
#train_df.to_csv('https://icdrive1.s3.amazonaws.com/drug_review_data_cleaned.csv', index=False)
train_df.to_csv('drug_review_data_cleaned.csv')

## Put Pandas Dataframe into Spark Dataframe with selected columns

In [20]:
%%time
tr_spark_df = spark.createDataFrame(train_df[["condition","review","pos","neg","label"]])

Wall time: 10.6 s


## Create Data Pipeline

In [21]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
# Create all the features to the data set
tokenizer = Tokenizer(inputCol='review', outputCol='token_text')
tokened_df = tokenizer.transform(tr_spark_df)
tokened_df.show(truncate=False)

+----------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+-----+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [22]:
stopremove = StopWordsRemover(inputCol='token_text', outputCol='stop_tokens')
stop_df = stopremove.transform(tokened_df)

In [23]:
hashingTF = HashingTF(inputCol="token_text", outputCol='hash_token')
hash_df = hashingTF.transform(stop_df)

In [24]:
idf = IDF(inputCol='hash_token', outputCol='idf_token')
idfModel = idf.fit(hash_df)
idf_df = idfModel.transform(hash_df)


In [25]:
%%time
from pyspark.ml.feature import VectorAssembler

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token'], outputCol='features')

Wall time: 1.5 s


In [26]:
%%time
# Create and run a data processing Pipeline
from pyspark.ml import Pipeline
data_prep_pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up])

Wall time: 0 ns


## Transform Dataframe

In [27]:
%%time
# Fit and transform the pipeline
cleaner = data_prep_pipeline.fit(tr_spark_df)
cleaned = cleaner.transform(tr_spark_df)


Wall time: 9.61 s


In [28]:
cleaned.show(5)

+--------------------+--------------------+-----+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|           condition|              review|  pos|  neg|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------------------+--------------------+-----+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|Left Ventricular ...|It has no side ef...|  0.0|0.136|    1|[it, has, no, sid...|[side, effect,, t...|(262144,[2325,462...|(262144,[2325,462...|(262144,[2325,462...|
|                ADHD|My son is halfway...|0.111|0.018|    1|[my, son, is, hal...|[son, halfway, fo...|(262144,[2050,524...|(262144,[2050,524...|(262144,[2050,524...|
|       Birth Control|I used to take an...|0.083|0.062|    0|[i, used, to, tak...|[used, take, anot...|(262144,[2325,728...|(262144,[2325,728...|(262144,[2325,728...

In [29]:
# show rating and resulting features
#cleaned.select(['label', 'features']).show(truncate=False)

## Run NaiveBayes

In [30]:
%%time
from pyspark.ml.classification import NaiveBayes
# Break data into training and testing set
(training, testing) = cleaned.randomSplit([0.7, 0.3])


Wall time: 1.14 s


In [31]:
%%time
training.show(5)

+--------------------+--------------------+-----+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|           condition|              review|  pos|  neg|label|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------------------+--------------------+-----+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|0</span> users fo...|Caused severe agi...|  0.0|0.806|    0|[caused, severe, ...|[caused, severe, ...|(262144,[18713,50...|(262144,[18713,50...|(262144,[18713,50...|
|0</span> users fo...|I LOVE Hypercare!...|0.086|  0.0|    1|[i, love, hyperca...|[love, hypercare!...|(262144,[4081,828...|(262144,[4081,828...|(262144,[4081,828...|
|0</span> users fo...|I am a 9-12 teach...|0.064|0.109|    1|[i, am, a, 9-12, ...|[9-12, teacher., ...|(262144,[5381,139...|(262144,[5381,139...|(262144,[5381,139...

## Create a NaiveBayes model and fit the training data

In [32]:
%%time
nb = NaiveBayes(smoothing=1.0, modelType='multinomial')
predictor = nb.fit(training)

Wall time: 51.6 s


## Transform the model with test_data

In [33]:
# Transform the model with test data
test_results = predictor.transform(testing)
#test_results.show(5)
#test_results['review']

## Predict accuracy of the model

In [34]:
# Use the class evaluator for a cleaner description
# https://spark.apache.org/docs/latest/mllib-naive-bayes.html
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
acc_eval = MulticlassClassificationEvaluator()
acc = acc_eval.evaluate(test_results)
print("Accuracy of model at predicting reviews was: %f" % acc)

Accuracy of model at predicting reviews was: 0.820921


## Save the model to disk

In [35]:
%%time
# Save and load model
import shutil
output_dir = './myNaiveBayesModel'
shutil.rmtree(output_dir, ignore_errors=True)
predictor.save(output_dir)

Wall time: 7.9 s


## Prediction using New Test data(Online Drug reviews)

## Load the saved model from disk

In [36]:
%%time
# load the model from disk
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
#drug_model = NaiveBayesModel.load(sc,output_dir)

Wall time: 1.25 s


**Test with a new online reviews**

**Defining a function transform the new test string**

In [43]:
def check_review(test_review):
    d = {'review': test_review, 'pos': '', 'neg': ""}
    df = pd.DataFrame(d, index=[0])
    #Analyze Sentiment and Populate DataFrame. 
    analyzer = SentimentIntensityAnalyzer()
    for index, row in df.iterrows():
        scores = analyzer.polarity_scores(row[0])
        print(scores)
        pos = scores['pos']
        neg = scores['neg']
        row[1] = pos
        row[2] = neg
    #Convert to Spark DataFrame.
    sdf = spark.createDataFrame(df)

    #Fit & Clean Spark DataFrame 
    cleaned_sdf = cleaner.transform(sdf)
    cleaned_sdf.show()
    print(type(predictor))

    #Perform Prediction and Print results.
    pc_results = predictor.transform(cleaned_sdf)
    pc_results.select('probability', 'prediction').show(truncate=False)
    print("1 = Seems positive review, 0 = Seems negative feedback")

    response = ""
    prediction = pc_results.select('prediction').rdd.map(lambda row : row[0]).collect()[0]
    probability = pc_results.select('probability').rdd.map(lambda row : row[0]).collect()[0]
    print(pc_results.select('prediction').rdd.map(lambda row : row[0]).collect())
    probability_bad = probability[0]
    probability_good = probability[1]

    if prediction == 0 and probability_bad >= .75:
        response = f"Hey, Seems the user didn't have great experience with this drug as ample evidence of unhappiness Reliability: {round(probability_bad*100)}% "
    elif prediction == 1 and probability_good >= .75:
        response = f"Everything is looking good over here! Reliability: {round(probability_good*100)}% "
    else:
        response = f"Sorry, the analysis is not confident enough to render a reliabile decision with the given text. Please try a longer submission."
            

    return response




## Test Case-1

In [44]:
test_review = 'This medication is fantastic pain has almost disappeared and it also helps with sleep problems. The only thing I have noticed is weight gain, has anyone experienced this?'
print(check_review(test_review))

{'neg': 0.199, 'neu': 0.55, 'pos': 0.251, 'compound': 0.4576}
+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|              review|  pos|  neg|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|This medication i...|0.251|0.199|[this, medication...|[medication, fant...|(262144,[13081,15...|(262144,[13081,15...|(262144,[13081,15...|
+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+

<class 'pyspark.ml.classification.NaiveBayesModel'>
+------------------------------------------+----------+
|probability                               |prediction|
+------------------------------------------+----------+
|[1.1636114342286

## Test Case - 2

In [45]:
test_string = "I have only been taking this medication for a week and I am noticing side effects I don't like. I have not been able to sleep properly. I wake up very frequently thorughout the night. I am currently experiencing flu like symptoms and feel feverish. I need to call my doctor to see if these effects would disappear eventually. I am worried and very unhappy."
print(check_review(test_review))

{'neg': 0.199, 'neu': 0.55, 'pos': 0.251, 'compound': 0.4576}
+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|              review|  pos|  neg|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|This medication i...|0.251|0.199|[this, medication...|[medication, fant...|(262144,[13081,15...|(262144,[13081,15...|(262144,[13081,15...|
+--------------------+-----+-----+--------------------+--------------------+--------------------+--------------------+--------------------+

<class 'pyspark.ml.classification.NaiveBayesModel'>
+------------------------------------------+----------+
|probability                               |prediction|
+------------------------------------------+----------+
|[1.1636114342286

## Test Case -3

In [46]:
test_review = "Some problems with itchy rash but not enough for the doctor to discontinue. I must take with benadryl though."
print(check_review(test_review))

{'neg': 0.259, 'neu': 0.741, 'pos': 0.0, 'compound': -0.5023}
+--------------------+---+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|              review|pos|  neg|          token_text|         stop_tokens|          hash_token|           idf_token|            features|
+--------------------+---+-----+--------------------+--------------------+--------------------+--------------------+--------------------+
|Some problems wit...|0.0|0.259|[some, problems, ...|[problems, itchy,...|(262144,[16332,19...|(262144,[16332,19...|(262144,[16332,19...|
+--------------------+---+-----+--------------------+--------------------+--------------------+--------------------+--------------------+

<class 'pyspark.ml.classification.NaiveBayesModel'>
+------------------------------------------+----------+
|probability                               |prediction|
+------------------------------------------+----------+
|[0.9999999990296005,9.7039