# Setup

First obtain dependencies

In [1]:
# Pyspark SQL
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf

In [2]:
# pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import MinMaxScaler, VectorAssembler, HashingTF, IDF, RegexTokenizer, StopWordsRemover, Bucketizer, QuantileDiscretizer
from pyspark.ml.linalg import Vectors

In [3]:
# Install a pip package (langdetect) in the current Jupyter kernel
import sys
!{sys.executable} -m pip install langdetect

[33mYou are using pip version 8.1.2, however version 20.0.2 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [4]:
%load_ext autoreload
%autoreload 2

Fill in the following data

In [5]:
# Login information
# username = # AWS Username
# password = # AWS Password
# region = "us-east-1" # Change if different from your AWS region


# Dataset location
# s3 = #s3a address

Connect to the AWS resources

In [6]:
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.awsAccessKeyId", username)
sc._jsc.hadoopConfiguration().set("fs.s3a.awsSecretAccessKey", password)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + region + ".amazonaws.com")

In [7]:
df = sqlContext.read.format("csv").option("header", "true").load(s3)

# Project

In [8]:
rdd = df.rdd.map(tuple)

In [9]:
# small = sc.parallelize(rdd.take(10))

## Preprocessing

Removes non-English data from the dataset

In [10]:
from langdetect import detect

def remove_nonenglish(row):
    '''
    Removes records that have invalid descriptions from the dataframe
    Input: dataframe
    Output: Cleaned up dataframe
    '''
    try:
        lang=detect(row[1])
        if (lang == 'en'): 
            return True
        else: 
            return False
    except:
        return False

def replace_punc_with_space(desc):
    
    upd_desc=''
    
    for i in range(len(desc)-1):
        upd_desc+=desc[i]
        if desc[i].islower() and desc[i+1].isupper():
            upd_desc+=' '
    
    upd_desc+=desc[-1]
    return upd_desc 

def remove_punc(row):
    desc = row[1]
    
    desc=replace_punc_with_space(desc)    
    desc=desc.lower() 
    desc = "".join([" " if char in ['.', ',', '?', '!', '(', ')', '/', ';', ':'] else char for char in desc])
    desc = "".join(["" if char in ['\''] else char for char in desc])
    
    lst = list(row)
    lst[1] = desc
    tup = tuple(lst)

    return tup


In [11]:
# small = small.filter(remove_nonenglish).map(remove_punc)

Genres converted to array

In [12]:
def genre_to_array(row):
    genres = row[10]
    glist = []
    
    if(genres is not None): glist = genres.split('|')
    
    lst = list(row)
    lst[10] = glist
    tup = tuple(lst)

    return tup

Applies the above processes to rdd

In [13]:
rdd = rdd.filter(remove_nonenglish).map(remove_punc).map(genre_to_array)

In [14]:
print(rdd.take(1))

[('Kristin Hannah', 'alaska  1974 unpredictable  unforgiving  untamed for a family in crisis  the ultimate test of survival ernt allbright  a former pow  comes home from the vietnam war a changed and volatile man  when he loses yet another job  he makes an impulsive decision  he will move his family north  to alaska  where they will live off the grid in america’s last true frontier thirteen-year-old leni  a girl coming of age in a tumultuous time  caught in the riptide of her parents’ passionate  stormy relationship  dares to hope that a new land will lead to a better future for her family  she is desperate for a place to belong  her mother  cora  will do anything and go anywhere for the man she loves  even if it means following him into the unknown at first  alaska seems to be the answer to their prayers  in a wild  remote corner of the state  they find a fiercely independent community of strong men and even stronger women  the long  sunlit days and the generosity of the locals make u

Convert data to dataframe with header names and cast datatypes

In [15]:
booksdf = rdd.toDF(['author', 'description', 'edition', 'format', 
                    'isbn13', 'pages', 'rating', 'ratingCount', 
                    'review_count', 'title', 'genres', 'image_url']) \
    .drop("edition") \
    .drop("format") \
    .drop("pages") \
    .drop("isbn13") \
    .drop("review_count") \
    .drop("image_url")

booksdf = booksdf.withColumn("rating", booksdf["rating"].cast("decimal(3,2)")) \
                .withColumn("ratingCount", booksdf["ratingCount"].cast("long"))
booksdf.printSchema()

root
 |-- author: string (nullable = true)
 |-- description: string (nullable = true)
 |-- rating: decimal(3,2) (nullable = true)
 |-- ratingCount: long (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)



Drop all elements with a null description or rating

In [16]:
booksdf = booksdf.dropna(subset=('description', 'rating'))

### Bucket

https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.QuantileDiscretizer

One method is with **QuantileDiscretizer**, which buckets by frequency (equal number in each bucket).

Adjust **numBuckets** to change the bucket sizes for the ratings and get different outcomes. The lower number of buckets, the more book ratings per bucket.

Sample code:

```
discretizer = QuantileDiscretizer(numBuckets=16, inputCol='rating', outputCol='label')
booksdf = discretizer.fit(booksdf).transform(booksdf)
```

https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.Bucketizer

Alternative is to use **Bucketizer**, which buckets by length (or range). Adjust the numbers in the range to fix the outcome.

Sample code:

```
splitQuarter = [0,0.25,0.5,0.75,1,1.25,1.5,1.75,2,2.25,2.5,2.75,3,3.25,3.5,3.75,4,4.25,4.5,4.75,5]
splitHalf = [0,0.5,1,1.5,2,2.5,3,3.5,4,4.5,5]
bucketizer = Bucketizer(splits=split, inputCol='rating', outputCol='label')
booksdf = bucketizer.transform(booksdf)
```

In [17]:
split = [0,0.5,1,1.5,2,2.5,3,3.5,4,4.5,5]
bucketizer = Bucketizer(splits=split, inputCol='rating', outputCol='label')
booksdf = bucketizer.transform(booksdf)

Convert the number of ratings into weight for Naive Bayes classification.

In [18]:
assembler = VectorAssembler(inputCols=["ratingCount"], outputCol="countVec")
scaler = MinMaxScaler(inputCol='countVec', outputCol='weightVec')

In [19]:
pipeline = Pipeline(stages=[assembler, scaler])
booksdf = pipeline.fit(booksdf).transform(booksdf)

In [20]:
def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())
booksdf = booksdf.withColumn("weight", ith("weightVec", lit(0)))

In [21]:
booksdf[("rating", "label", "weight")].show(10)

+------+-----+-------------------+
|rating|label|             weight|
+------+-----+-------------------+
|  4.33|  8.0| 0.8339395595325512|
|  4.34|  8.0| 0.3492545979268772|
|  4.18|  8.0|0.35802068569021556|
|  3.97|  7.0|                1.0|
|  4.25|  8.0|0.32325233240549417|
|  4.01|  8.0| 0.5209219192039937|
|  3.85|  7.0| 0.5113589143712609|
|  4.53|  9.0|0.12990886684084996|
|  3.92|  7.0|  0.369775212463783|
|  3.73|  7.0| 0.3518616551967531|
+------+-----+-------------------+
only showing top 10 rows



### TF/IDF

In [22]:
tokenizer = RegexTokenizer(inputCol="description", outputCol="descToken", pattern="\\W")
booksdf = tokenizer.transform(booksdf)

swremover = StopWordsRemover(inputCol="descToken", outputCol="desc")
booksdf = swremover.transform(booksdf)

booksdf[("description", "descToken", "desc")].show(10)

+--------------------+--------------------+--------------------+
|         description|           descToken|                desc|
+--------------------+--------------------+--------------------+
|alaska  1974 unpr...|[alaska, 1974, un...|[alaska, 1974, un...|
|in the house of h...|[in, the, house, ...|[house, helios, g...|
|of course i want ...|[of, course, i, w...|[course, want, li...|
|anna fox lives al...|[anna, fox, lives...|[anna, fox, lives...|
|they killed my mo...|[they, killed, my...|[killed, mother, ...|
|newlyweds celesti...|[newlyweds, celes...|[newlyweds, celes...|
|when you read thi...|[when, you, read,...|[read, book, make...|
|rowan has gone ro...|[rowan, has, gone...|[rowan, gone, rog...|
|hope warms the co...|[hope, warms, the...|[hope, warms, col...|
|if you knew the d...|[if, you, knew, t...|[knew, date, deat...|
+--------------------+--------------------+--------------------+
only showing top 10 rows



https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.HashingTF

**numFeatures should be adjusted to get better outcomes**

> Since a simple modulo is used to transform the hash function to a column index, it is advisable to **use a power of two** as the numFeatures parameter; otherwise the features will not be mapped evenly to the columns.

In [23]:
hashingTF = HashingTF(inputCol="desc", outputCol="rawFeatures", numFeatures=32)
featurizedData = hashingTF.transform(booksdf)

In [24]:
featurizedData.select('desc', 'rawFeatures').show(10)

+--------------------+--------------------+
|                desc|         rawFeatures|
+--------------------+--------------------+
|[alaska, 1974, un...|(32,[0,1,2,3,4,5,...|
|[house, helios, g...|(32,[0,1,2,3,4,5,...|
|[course, want, li...|(32,[0,1,2,3,4,5,...|
|[anna, fox, lives...|(32,[1,2,3,5,6,7,...|
|[killed, mother, ...|(32,[0,1,2,4,5,6,...|
|[newlyweds, celes...|(32,[1,2,4,5,6,7,...|
|[read, book, make...|(32,[0,1,2,3,5,6,...|
|[rowan, gone, rog...|(32,[0,1,2,3,4,5,...|
|[hope, warms, col...|(32,[0,2,3,5,6,7,...|
|[knew, date, deat...|(32,[1,2,3,4,5,6,...|
+--------------------+--------------------+
only showing top 10 rows



In [25]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [27]:
rescaledData.select("title", "label", "features").show(10)

+--------------------+-----+--------------------+
|               title|label|            features|
+--------------------+-----+--------------------+
|     The Great Alone|  8.0|(32,[0,1,2,3,4,5,...|
|               Circe|  8.0|(32,[0,1,2,3,4,5,...|
|    The Cruel Prince|  8.0|(32,[0,1,2,3,4,5,...|
|The Woman in the ...|  7.0|(32,[1,2,3,5,6,7,...|
|Children of Blood...|  8.0|(32,[0,1,2,4,5,6,...|
|An American Marriage|  8.0|(32,[1,2,4,5,6,7,...|
| The Wife Between Us|  7.0|(32,[0,1,2,3,5,6,...|
|         Thunderhead|  9.0|(32,[0,1,2,3,4,5,...|
|A Court of Frost ...|  7.0|(32,[0,2,3,5,6,7,...|
|    The Immortalists|  7.0|(32,[1,2,3,4,5,6,...|
+--------------------+-----+--------------------+
only showing top 10 rows



### Training and Testing datsets

In [28]:
(trainingData, testData) = rescaledData.randomSplit([0.7, 0.3])

In [29]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

### Descision Tree Classification

https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.classification.DecisionTreeClassifier

In [39]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", cacheNodeIds = True, checkpointInterval = 10, impurity = 'entropy')
dtModel = dt.fit(trainingData)

In [40]:
dtPredictions = dtModel.transform(testData)
dtPredictions.select("prediction", "label", "features").show(10)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       8.0|  9.0|(32,[3,5,7,8,9,11...|
|       8.0|  8.0|(32,[1,2,3,4,5,6,...|
|       7.0|  8.0|(32,[0,1,2,3,4,5,...|
|       8.0|  7.0|(32,[0,1,2,3,4,5,...|
|       8.0|  7.0|(32,[0,1,2,3,5,6,...|
|       8.0|  8.0|(32,[0,1,2,3,4,5,...|
|       7.0|  8.0|(32,[0,1,2,3,4,5,...|
|       8.0|  7.0|(32,[0,2,3,5,6,7,...|
|       7.0|  8.0|(32,[0,1,2,3,4,5,...|
|       8.0|  8.0|(32,[0,3,4,5,6,7,...|
+----------+-----+--------------------+
only showing top 10 rows



In [41]:
dtAccuracy = evaluator.evaluate(dtPredictions)
print(dtAccuracy)

0.47126436781609193


In [31]:
dtPredictions = dtModel.transform(testData)
dtPredictions.select("prediction", "label", "features").show(10)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       8.0|  9.0|(32,[3,5,7,8,9,11...|
|       7.0|  8.0|(32,[1,2,3,4,5,6,...|
|       8.0|  8.0|(32,[0,1,2,3,4,5,...|
|       8.0|  7.0|(32,[0,1,2,3,4,5,...|
|       8.0|  7.0|(32,[0,1,2,3,5,6,...|
|       8.0|  8.0|(32,[0,1,2,3,4,5,...|
|       8.0|  8.0|(32,[0,1,2,3,4,5,...|
|       7.0|  7.0|(32,[0,2,3,5,6,7,...|
|       7.0|  8.0|(32,[0,1,2,3,4,5,...|
|       8.0|  8.0|(32,[0,3,4,5,6,7,...|
+----------+-----+--------------------+
only showing top 10 rows



In [32]:
dtAccuracy = evaluator.evaluate(dtPredictions)
print(dtAccuracy)

0.46360153256704983


### Random Forest Classification

https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.classification.RandomForestClassifier

In [33]:
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
rfModel = rf.fit(trainingData)

In [34]:
rfPredictions = rfModel.transform(testData)
rfPredictions.select("prediction", "label", "features").show(10)

+----------+-----+--------------------+
|prediction|label|            features|
+----------+-----+--------------------+
|       7.0|  9.0|(32,[3,5,7,8,9,11...|
|       8.0|  8.0|(32,[1,2,3,4,5,6,...|
|       8.0|  8.0|(32,[0,1,2,3,4,5,...|
|       7.0|  7.0|(32,[0,1,2,3,4,5,...|
|       8.0|  7.0|(32,[0,1,2,3,5,6,...|
|       8.0|  8.0|(32,[0,1,2,3,4,5,...|
|       8.0|  8.0|(32,[0,1,2,3,4,5,...|
|       8.0|  7.0|(32,[0,2,3,5,6,7,...|
|       8.0|  8.0|(32,[0,1,2,3,4,5,...|
|       7.0|  8.0|(32,[0,3,4,5,6,7,...|
+----------+-----+--------------------+
only showing top 10 rows



In [35]:
rfAccuracy = evaluator.evaluate(rfPredictions)
print(rfAccuracy)

0.5325670498084292


### Naive Bayes Multinomial Classification

https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.classification.NaiveBayes

In [None]:
nb = NaiveBayes(labelCol="label", featuresCol="features", modelType="bernoulli", weightCol="weight")
nbModel = nb.fit(trainingData)

In [None]:
nbPredictions = nbModel.transform(testData)
nbPredictions.select("prediction", "label", "features").show(10)

In [None]:
nbAccuracy = evaluator.evaluate(nbPredictions)
print(nbAccuracy)

In [None]:
nb = NaiveBayes(labelCol="label", featuresCol="features", modelType="multinomial", weightCol="weight")
nbModel = nb.fit(trainingData)

In [None]:
nbPredictions = nbModel.transform(testData)
nbPredictions.select("prediction", "label", "features").show(10)

In [None]:
nbAccuracy = evaluator.evaluate(nbPredictions)
print(nbAccuracy)