### Sentiment Classification of Goodreads Book Reviews

In [1]:
from pyspark import SparkConf, SparkContext, sql
from pyspark.sql import SparkSession, SQLContext

In [2]:
conf = SparkConf().setAppName("building a warehouse")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv')\
        .options(header='true', inferschema='true')\
        .load('reviews.csv')
        # load('hdfs://localhost/reviews.csv')

In [3]:
import pyspark.sql.functions as f
df = df.withColumn('rating_f', f.round(df.rating.cast('float'), 2))
df = df.withColumn('word_count', f.size(f.split(f.col('review'),' ')))
df = df.filter(f.col('word_count') > 2)

In [4]:
df.groupBy('genre').count().orderBy(f.col('count').desc()).show()

+--------------+-----+
|         genre|count|
+--------------+-----+
|    Nonfiction|  356|
|       Fiction|  120|
|    Historical|  114|
|    Philosophy|   84|
|       History|   77|
|           Art|   75|
|       Mystery|   48|
|       Science|   39|
|Sequential Art|   30|
|      Classics|   24|
|     Childrens|   21|
| Autobiography|   19|
|     Christian|   18|
|    Psychology|   17|
|      Politics|   15|
|   Young Adult|   14|
|       Romance|   14|
|     Biography|   13|
|      Business|   10|
|       Fantasy|    7|
+--------------+-----+
only showing top 20 rows



In [5]:
# Generate sentiments for data
df=df.withColumn('label',f.when(df['rating_f']<3.7,-1).otherwise(1))

In [6]:
# Generate classes for data
from pyspark.ml.feature import Bucketizer
splits = [float('-inf'), 3.50,
          3.51, 3.75,
          3.76, 4.00,
          4.01, 4.25,
          4.26, 4.50,
          4.51, 4.75,
          4.76, float('inf')]
labels = [1, 2, 3, 4, 5, 6, 7]
b=Bucketizer(splits=splits,inputCol='rating_f',outputCol='b_label')
dfs = b.transform(df)

In [7]:
data = df.select(['review', 'label'])

In [8]:
data.printSchema()

root
 |-- review: string (nullable = true)
 |-- label: integer (nullable = false)



In [9]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, \
    HashingTF, IDF, CountVectorizer
regexTokenizer = RegexTokenizer(inputCol="review", outputCol="words", pattern="\\W")
stop_words = ['ourselves', 'hers', 'between', 'yourself',
              'but', 'again', 'there', 'about', 'once', 
              'during', 'out', 'very', 'having', 'with',
              'they', 'own', 'an', 'be', 'some', 'for',
              'do', 'its', 'yours', 'such', 'into', 'of',
              'most', 'itself', 'other', 'off', 'is', 's',
              'am', 'or', 'who', 'as', 'from', 'him',
              'each', 'the', 'themselves', 'until', 'below',
              'are', 'we', 'these', 'your', 'his', 'through',
              'don', 'nor', 'me', 'were', 'her', 'more',
              'himself', 'this', 'down', 'should', 'our',
              'their', 'while', 'above', 'both', 'up',
              'to', 'ours', 'had', 'she', 'all', 'no',
              'when', 'at', 'any', 'before', 'them', 'same',
              'and', 'been', 'have', 'in', 'will', 'on',
              'does', 'yourselves', 'then', 'that', 'because',
              'what', 'over', 'why', 'so', 'can', 'did', 'not',
              'now', 'under', 'he', 'you', 'herself', 'has',
              'just', 'where', 'too', 'only', 'myself', 'which',
              'those', 'i', 'after', 'few', 'whom', 't',
              'being', 'if', 'theirs', 'my', 'against', 'a',
              'by', 'doing', 'it', 'how', 'further', 'was',
              'here', 'than']
stopwordsRemover = StopWordsRemover(inputCol="words",
                                    outputCol="filtered")\
                                    .setStopWords(stop_words)
hashingTF = HashingTF(inputCol="filtered",
                      outputCol="rawFeatures",
                      numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5)
# countVectors = CountVectorizer(inputCol="filtered",
#                                outputCol="features",
#                                vocabSize=10000, minDF=5)

In [10]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover,
                            hashingTF, idf])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)

In [11]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [12]:
# Split data into training and testing sets
(trainingData, testData) = dataset.randomSplit([0.75, 0.25], 1234)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 898
Test Dataset Count: 250


In [13]:
# create the trainer and set its parameters
nb = NaiveBayes(smoothing=1.0, modelType="multinomial")

In [14]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(nb.smoothing, [0.85, 1.0, 1.25])
             .build())
evaluator=MulticlassClassificationEvaluator(predictionCol="prediction")

# Create 10-fold CrossValidator
cv = CrossValidator(estimator=nb, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=10)

cvModel = cv.fit(trainingData)
predictions = cvModel.transform(testData)
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = " + str(accuracy))

Test set accuracy = 0.8276556291390728
