In [1]:
import os
os.environ["JAVA_HOME"] = "C:\Program Files\Java\jdk1.8.0_271"
os.environ["HADOOP_HOME"] = "C:\Installations\Hadoop"
os.environ["SPARK_HOME"] = "D:\spark-2.4.5-bin-hadoop2.7\spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
# Import dependencies
from pyspark import SparkFiles
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.regression import RandomForestRegressor

In [3]:
# Create builder for SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .master("local[*]") \
    .config("spark.driver.memory", "10g") \
    .appName("CloudETLProject") \
    .getOrCreate()

In [4]:
# Read csv
import pandas as pd
pd_test = pd.read_csv('../cleaned_nlp_data/testfinal.csv', sep=',')
pd_train = pd.read_csv('../cleaned_nlp_data/trainfinal.csv', sep=',')

In [5]:
# Create a PySpark schema
mySchema = StructType([ StructField("uniqueID", StringType(), True)\
                       ,StructField("drugName", StringType(), True)\
                       ,StructField("condition", StringType(), True)\
                       ,StructField("review", StringType(), True)\
                       ,StructField("rating", IntegerType(), True)\
                       ,StructField("date", StringType(), True)\
                       ,StructField("usefulCount", StringType(), True)\
                     ])

In [6]:
# Create PySpark dataframes
from pyspark.sql import SQLContext
from pyspark import SparkConf, SparkContext
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

df_test = sqlContext.createDataFrame(pd_test, schema=mySchema)
df_train = sqlContext.createDataFrame(pd_train, schema=mySchema)

In [7]:
# Drop columns that won't be used
drop_df = df_test.drop('uniqueID','drugName','condition', 'date', 'usefulCount' ).collect()
drop_df_two = df_train.drop('uniqueID','drugName','condition', 'date', 'usefulCount' ).collect()

In [8]:
# Show test dataframe with dropped columns
test_df = spark.createDataFrame(drop_df)
test_df.show()

+--------------------+------+
|              review|rating|
+--------------------+------+
|gave me rapid hea...|     0|
|    it cured my mrsa|     1|
|i have been on zy...|     1|
|it didnt work as ...|     1|
|i have had  major...|     1|
|i had mrsa inf la...|     1|
|i got a mrsa stap...|     1|
|very satisfied wi...|     1|
|effectiveness las...|     0|
|my psa was going ...|     1|
|on zytiga for  mo...|     1|
|began zytiga with...|     1|
|had tried clariti...|     1|
|this medicine wor...|     1|
|i have had cholin...|     1|
|after travelling ...|     1|
|i suffered from m...|     1|
|i recently had te...|     1|
|it works great fo...|     1|
|had hives nearly ...|     1|
+--------------------+------+
only showing top 20 rows



In [9]:
# Show train dataframe with dropped columns
train_df = spark.createDataFrame(drop_df_two)
train_df.show()

+--------------------+------+
|              review|rating|
+--------------------+------+
|it has no side ef...|     1|
|my son is halfway...|     1|
|i used to take an...|     0|
|this is my first ...|     1|
|suboxone has comp...|     1|
|nd day on mg star...|     0|
|he pulled out but...|     0|
|abilify changed m...|     1|
| i ve had  nothin...|     0|
|i had been on the...|     1|
|i have been on th...|     1|
|i have taken anti...|     1|
|i had crohns with...|     0|
|have a little bit...|     0|
|started nexplanon...|     0|
|i have been takin...|     1|
|this drug worked ...|     1|
|ive been taking a...|     1|
|ive been on every...|     1|
|i have been on ta...|     1|
+--------------------+------+
only showing top 20 rows



In [18]:
# Create pipeline
def build_trigrams(inputCol=["review","rating"], n=3):

# Tokenizer converts input text into a stream of tokens, where each token is a separate word
    tokenizer = [Tokenizer(inputCol="review", outputCol="words")]

# Get rid of stop words
    stopremove = [StopWordsRemover(inputCol='words',outputCol='stop_tokens')]

# Stem the words

# Creates a column for every word, two and three words (n=3). Looks for sequences of words from a given sample of text or speech. 
    ngrams = [
        NGram(n=i, inputCol="stop_tokens", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

# Count vectorizer converts a collection of text (i.e review rows) to a vector term or token counts. Counts the number of times a word repeats. 
    cv = [
        CountVectorizer(vocabSize=2**15,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]

# Inverse document frequency scales down the term weights of terms with high collection frequency. If a word appears very often, the weight of that word is decreased. (i.e the words drug/medication appears often because this is a dataset about those things)
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

# Vector assembler combines raw features and features into a single feature vector. We are using it to combine cv and idf. 
    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="features"
    )]
# String indexer converts text to numeric. 
    label_stringIdx = [StringIndexer(inputCol = "rating", outputCol = "label")]

# Logistic regression is the model being used. Using logistic regression because we are taking in two values (binary). For our purposes it will be 0 and 1 (negative/positive). 
    rf = [RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=1)]
    return Pipeline(stages=tokenizer + stopremove + ngrams + cv + idf + assembler + label_stringIdx + rf)

In [19]:
# run the model 
trigram_pipelineFit = build_trigrams().fit(train_df)
test_results = trigram_pipelineFit.transform(test_df)

In [20]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()

# Accuracy might not always be the right measure, especially if target class is not balanced. 
accuracy = test_results.filter(test_results.label == test_results.prediction).count() / float(test_results.count())
# Apply a confusion matrix for TP, TN and FN. 
roc_auc = evaluator.evaluate(test_results)
print("Accuracy Score: {0:.4f}".format(accuracy))
print("ROC-AUC: {0:.4f}".format(roc_auc))

Accuracy Score: 0.7157
ROC-AUC: 0.5332


In [21]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = test_results.select(['label']).collect()
y_pred = test_results.select(['prediction']).collect()

In [22]:
print(classification_report(y_true, y_pred))

              precision    recall  f1-score   support

         0.0       0.72      0.96      0.82     37559
         1.0       0.61      0.15      0.24     16207

    accuracy                           0.72     53766
   macro avg       0.67      0.56      0.53     53766
weighted avg       0.69      0.72      0.65     53766



In [23]:
print(confusion_matrix(y_true, y_pred))

[[35998  1561]
 [13727  2480]]


In [24]:
train_df.groupBy("rating").count().orderBy(desc("count")).show()

+------+------+
|rating| count|
+------+------+
|     1|113209|
|     0| 48088|
+------+------+



In [13]:
# Save the model
# trigram_pipelineFit.save("random_forest_eval")