# 1 Load Data

First we load data from S3. It is stored as a trivial CSV file with three columns
1. product name
2. review text
3. rating (1 - 5)

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

schema =  StructType([
    StructField('name',StringType(),True),
    StructField('review',StringType(), True),
    StructField('rating',StringType(), True),
])

raw_data = spark.read.schema(schema).csv("s3://dimajix-training/data/amazon_baby")
raw_data.limit(5).toPandas()

Unnamed: 0,name,review,rating
0,name,review,rating
1,Planetwise Flannel Wipes,"These flannel wipes are OK, but in my opinion ...",3
2,Planetwise Wipe Pouch,it came early and was not disappointed. i love...,5
3,Annas Dream Full Quilt with 2 Shams,Very soft and comfortable and warmer than it l...,5
4,Stop Pacifier Sucking without tears with Thumb...,This is a product well worth the purchase. I ...,5


## 1.1 Clean and Cache Data

We need to convert the "rating" columns to an integer - but this will obviously fail for the first record, as this one contains the CSV header. So we need to perform some cleanup after trying to convert the data.

For helping distributing the workload, we repartition the DataFrame and also cache it.

In [2]:
data = raw_data.withColumn('rating',col('rating').cast(IntegerType())) \
    .filter(col('rating').isNotNull()) \
    .filter(col('review').isNotNull()) \
    .repartition(31) \
    .cache()

data.limit(5).toPandas()

Unnamed: 0,name,review,rating
0,Cloud b Gentle Giraffe On The Go Travel Sound ...,This is the best item ever to calm your baby t...,5
1,"Evenflo Tribute 5 Convertible Car Seat, Ella",Cheap. Feels cheap too. Doesn't feel sturdy. H...,3
2,"Britax Parkway SGL Booster Seat, Cardinal","Great product, well made, comfortable, and bei...",5
3,Boon Scrubble Interchangeable Bath Toy Squirt ...,These are great. They are a tad bit hard to p...,5
4,Summer Infant By Your Side Sleeper Portable Be...,"""I just purchased this co sleeper, so I will l...",4


## 1.2 Split Train Data / Test Data

Now let's do the usual split of our data into a training data set and a validation data set. Let's use 80% of all reviews for training and 20% for validation

In [3]:
train_data, test_data = data.randomSplit([0.8,0.2], seed=1)

print("train_data: %d" % train_data.count())
print("test_data: %d" % test_data.count())

train_data: 139461
test_data: 34861


# 2 Custom Transformers

In order to work with the text data, we need some custom transformers, which are not provided by PySpark. Luckily we can wrap any given Python algorithm into a PySpark ML Transformer, which can be directly used inside a ML Pipeline.

## 2.1 Implement Transformer for Removing Punctuations

We need a custom Transformer to build the pipeline. The transformer should remove all punctuations from a given column containing text.

In [4]:
from pyspark.ml import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

def remove_punctuations(text):
    import string
    for c in string.punctuation:
        text = text.replace(c, ' ')
    return text


class PunctuationCleanupTransformer(Transformer):
    def __init__(self, inputCol, outputCol):
        """
        Constructor of PunctuationCleanupTransformer which takes two arguments:
        inputCol - name of input column
        outputCol - name of output column
        """
        super(Transformer, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, dataset):
        """
        Protecetd _transform method which will be called by the public transform
        method. You should not call this method directly.
        """
        remove_punctuation_udf = udf(remove_punctuations, StringType())
        return dataset.withColumn(self.outputCol, remove_punctuation_udf(self.inputCol))

### Test Transformer

Lets create an instance of the Transformer and test it

In [5]:
cleaner = PunctuationCleanupTransformer(inputCol='review', outputCol='clean_review')
clean_data = cleaner.transform(data)

clean_data.limit(4).toPandas()

Unnamed: 0,name,review,rating,clean_review
0,Cloud b Gentle Giraffe On The Go Travel Sound ...,This is the best item ever to calm your baby t...,5,This is the best item ever to calm your baby t...
1,"Evenflo Tribute 5 Convertible Car Seat, Ella",Cheap. Feels cheap too. Doesn't feel sturdy. H...,3,Cheap Feels cheap too Doesn t feel sturdy H...
2,"Britax Parkway SGL Booster Seat, Cardinal","Great product, well made, comfortable, and bei...",5,Great product well made comfortable and bei...
3,Boon Scrubble Interchangeable Bath Toy Squirt ...,These are great. They are a tad bit hard to p...,5,These are great They are a tad bit hard to p...


## 2.2 Implement Transformer for Stemming

We need to stem words, and for doing so we use the Python NLTK library.

In [6]:
from nltk.stem import PorterStemmer

def stem_word(words):
    ps = PorterStemmer()
    return [ps.stem(word) for word in words]


class PorterStemmerTransformer(Transformer):
    def __init__(self, inputCol, outputCol):
        """
        Constructor of PorterStemmerTransformer which takes two arguments:
        inputCol - name of input column
        outputCol - name of output column
        """
        super(Transformer, self).__init__()
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, dataset):
        """
        Protecetd _transform method which will be called by the public transform
        method. You should not call this method directly.
        """
        stem_word_udf = udf(stem_word, ArrayType(StringType()))
        return dataset.withColumn(self.outputCol, stem_word_udf(self.inputCol))

### Test Transformer

Again we want to test the `PorterStemmerTransformer`

In [7]:
from pyspark.ml.feature import *

# First we need to Tokenize each line. In order to perform this task, we implement the following steps
# 1. Instantiate a Tokenizer instance from pyspark.ml.feature
# 2. Transform the raw data using the tokenizer
tokenizer = Tokenizer(inputCol='review', outputCol='words')
tokenized_data = tokenizer.transform(data)

# Then we can instantiate the Stemmer and use it on the words
stemmer = PorterStemmerTransformer(inputCol='words', outputCol='stemmed_review')
stemmed_data = stemmer.transform(tokenized_data)

stemmed_data.limit(4).toPandas()

Unnamed: 0,name,review,rating,words,stemmed_review
0,Cloud b Gentle Giraffe On The Go Travel Sound ...,This is the best item ever to calm your baby t...,5,"[this, is, the, best, item, ever, to, calm, yo...","[thi, is, the, best, item, ever, to, calm, you..."
1,"Evenflo Tribute 5 Convertible Car Seat, Ella",Cheap. Feels cheap too. Doesn't feel sturdy. H...,3,"[cheap., feels, cheap, too., doesn't, feel, st...","[cheap., feel, cheap, too., doesn't, feel, stu..."
2,"Britax Parkway SGL Booster Seat, Cardinal","Great product, well made, comfortable, and bei...",5,"[great, product,, well, made,, comfortable,, a...","[great, product,, well, made,, comfortable,, a..."
3,Boon Scrubble Interchangeable Bath Toy Squirt ...,These are great. They are a tad bit hard to p...,5,"[these, are, great., , they, are, a, tad, bit,...","[these, are, great., , they, are, a, tad, bit,..."


# 3 Create Prediciton Model

Now we have all parts and helpers in place to create a predicitive model using a PySpark ML Pipeline.

## 3.1 Create ML Pipeline

Now we have all components for creating an initial ML Pipeline. Remember that we have been using the following components before

* PunctuationCleanupTransformer - remove punctuations from reviews
* Tokenizer - for splitting reviews into words
* StopWordRemover - for removing stop words
* PorterStemmerTransformer - for stemming words
* NGram - for creating NGrams (we'll use two words per n-gram)
* CountVectorizer - for creating bag-of-word features from the words
* IDF - for creating TF-IDF features from the NGram counts
* LogisticRegression - for creating the real model

You also need to transform the incoming rating (1-5) to a sentiment (0 or 1) and you need to drop reviews with a rating of 3. This can be done using one ore more SQLTransformer instances. Inside the SQLTransformer instance you simply write SQL code and access the current DataFrame via `__THIS__`.

In [8]:
from pyspark.ml.feature import *
from pyspark.ml.classification import *

stopWords = ['the','a','and','or', 'it', 'this', 'of', 'an', 'as', 'in', 'on', 'is', 'are', 'to', 'was', 'for', 'then', 'i']
stopWords = StopWordsRemover.loadDefaultStopWords("english")

stages = [
    PunctuationCleanupTransformer(inputCol='review', outputCol='clean_review'),
    SQLTransformer(statement='SELECT *,CASE WHEN rating < 3 THEN 0.0 ELSE 1.0 END AS sentiment FROM __THIS__ WHERE rating <> 3'),
    Tokenizer(inputCol='clean_review', outputCol='words'),
    StopWordsRemover(inputCol='words', outputCol='vwords', stopWords=stopWords),
    PorterStemmerTransformer(inputCol='vwords', outputCol='stems'),
    NGram(inputCol='stems', outputCol='ngrams', n=3),
    CountVectorizer(inputCol='ngrams', outputCol='tf', minDF=2.0),
    IDF(inputCol='tf', outputCol='features'),
    LogisticRegression(featuresCol='features',labelCol='sentiment')
]
pipe = Pipeline(stages = stages)

## 3.2 Fit Pipeline Model
Using training data, we create a PipelineModel by fitting the Pipeline to the training data

In [9]:
model = pipe.fit(train_data)

## 3.3 Predict Data

Let us do some predictions of the test data using the model.

In [10]:
pred = model.transform(test_data)

pred.limit(10).toPandas()

Unnamed: 0,name,review,rating,clean_review,sentiment,words,vwords,stems,ngrams,tf,features,rawPrediction,probability,prediction
0,,I LOVE the CubeIt! FunBites!!!! This is perfec...,5,I LOVE the CubeIt FunBites This is perfec...,1.0,"[i, love, the, cubeit, , funbites, , , , , thi...","[love, cubeit, , funbites, , , , , perfect, , ...","[love, cubeit, , funbit, , , , , perfect, , es...","[love cubeit , cubeit funbit, funbit , funbi...","(2.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(3.658600720360723, 0.0, 0.0, 0.0, 0.0, 0.0, 0...","[-76805.93013939369, 76805.93013939369]","[0.0, 1.0]",1.0
1,,I'm all for being 'green'; this bag is perfect...,5,I m all for being green this bag is perfect...,1.0,"[i, m, all, for, being, , green, , , this, bag...","[m, , green, , , bag, perfect, holding, sandwi...","[m, , green, , , bag, perfect, hold, sandwich,...","[m green, green , green , bag, bag perfe...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-64115.51841211731, 64115.51841211731]","[0.0, 1.0]",1.0
2,#1 Adjustable Back Seat Baby Safety Mirror - E...,Great product and very good quality! Fits easi...,5,Great product and very good quality Fits easi...,1.0,"[great, product, and, very, good, quality, , f...","[great, product, good, quality, , fits, easily...","[great, product, good, qualiti, , fit, easili,...","[great product good, product good qualiti, goo...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-102710.26169347999, 102710.26169347999]","[0.0, 1.0]",1.0
3,#76 Hot Pink baby leg warmers for baby or girl...,these leg-warmers are great! they fit a bit ro...,5,these leg warmers are great they fit a bit ro...,1.0,"[these, leg, warmers, are, great, , they, fit,...","[leg, warmers, great, , fit, bit, roomy, 4, mo...","[leg, warmer, great, , fit, bit, roomi, 4, mon...","[leg warmer great, warmer great , great fit, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-54333.99940256587, 54333.99940256587]","[0.0, 1.0]",1.0
4,&quot;Sweet Pink Sherbet Hoodie Towel&quot; --...,Bought this as a gift for my 3yr old granddaug...,4,Bought this as a gift for my 3yr old granddaug...,1.0,"[bought, this, as, a, gift, for, my, 3yr, old,...","[bought, gift, 3yr, old, granddaughter, , , dr...","[bought, gift, 3yr, old, granddaught, , , dri,...","[bought gift 3yr, gift 3yr old, 3yr old grandd...","(1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(1.8293003601803615, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-119882.90976526483, 119882.90976526483]","[0.0, 1.0]",1.0
5,- \t New Umbra Bungee Wallet Business Card Cas...,Big and bulky. The bungee lid is awkward. Stic...,1,Big and bulky The bungee lid is awkward Stic...,0.0,"[big, and, bulky, , the, bungee, lid, is, awkw...","[big, bulky, , bungee, lid, awkward, , stick, ...","[big, bulki, , bunge, lid, awkward, , stick, t...","[big bulki , bulki bunge, bunge lid, bunge l...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-2894.584733256163, 2894.584733256163]","[0.0, 1.0]",1.0
6,10/pk - Enfamil Disposable Slow-Flow Soft Nipples,I love these nipples and they make it very eas...,5,I love these nipples and they make it very eas...,1.0,"[i, love, these, nipples, and, they, make, it,...","[love, nipples, make, easy, baby, drink, wonde...","[love, nippl, make, easi, babi, drink, wonder,...","[love nippl make, nippl make easi, make easi b...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-48687.08428005411, 48687.08428005411]","[0.0, 1.0]",1.0
7,10PC BLACK WHITE &amp; RED CRIB NURSERY BEDDIN...,"I washed and dried the set, as instructed, and...",4,I washed and dried the set as instructed and...,1.0,"[i, washed, and, dried, the, set, , as, instru...","[washed, dried, set, , instructed, , white, ma...","[wash, dri, set, , instruct, , white, materi, ...","[wash dri set, dri set , set instruct, instr...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-40835.45516427567, 40835.45516427567]","[0.0, 1.0]",1.0
8,12 Pack Fuzzi Bunz One-Size Cloth Diapers GEND...,I ordered a 12-pack of Fuzzi-Bunz One Size dia...,4,I ordered a 12 pack of Fuzzi Bunz One Size dia...,1.0,"[i, ordered, a, 12, pack, of, fuzzi, bunz, one...","[ordered, 12, pack, fuzzi, bunz, one, size, di...","[order, 12, pack, fuzzi, bunz, one, size, diap...","[order 12 pack, 12 pack fuzzi, pack fuzzi bunz...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[241064.05818129654, -241064.05818129654]","[1.0, 0.0]",0.0
9,1st Bday Girl Autograph Bear,I loved the idea of an autograph teddy bear th...,4,I loved the idea of an autograph teddy bear th...,1.0,"[i, loved, the, idea, of, an, autograph, teddy...","[loved, idea, autograph, teddy, bear, daughter...","[love, idea, autograph, teddi, bear, daughter,...","[love idea autograph, idea autograph teddi, au...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","[-149718.1768678998, 149718.1768678998]","[0.0, 1.0]",1.0


## 3.4 Model Evaluation
As in the original exercise, we want to use a custom metric for assessing the performance.

In [11]:
from pyspark.ml.evaluation import *

class AccuracyClassificationEvaluator(Evaluator):
    def __init__(self, predictionCol='prediction', labelCol='label'):
        super(Evaluator,self).__init__()
        self.predictionCol = predictionCol
        self.labelCol = labelCol
    
    def _evaluate(self, dataset):
        num_total = dataset.count()
        num_correct = dataset.filter(col(self.labelCol) == col(self.predictionCol)).count()
        accuracy = float(num_correct) / num_total
        return accuracy

## Assess Performance

With the evaluator we can assess the performance of the prediction and easily compare it to a simple model which always predicts 'positive'.

In [12]:
print("Num positive reviews: %d" % pred.filter(pred.sentiment > 0.5).count())
print("Num negative reviews: %d" % pred.filter(pred.sentiment < 0.5).count())

Num positive reviews: 26774
Num negative reviews: 4923


In [13]:
always_positive = pred.withColumn('prediction',lit(1.0))

evaluator = AccuracyClassificationEvaluator(predictionCol='prediction', labelCol='sentiment')

print("Model Accuracy = %f" % evaluator.evaluate(pred))
print("Baseline Accuracy = %f" % evaluator.evaluate(always_positive))

Model Accuracy = 0.878600
Baseline Accuracy = 0.844686
